diff --git a/filebeat/beater/crawler.go b/filebeat/beater/crawler.go index ecb7a8d1bbf..f7d9ace0318 100644 --- a/filebeat/beater/crawler.go +++ b/filebeat/beater/crawler.go @@ -48,9 +48,10 @@ func newCrawler( inputConfigs []*conf.C, beatDone chan struct{}, once bool, + logger *logp.Logger, ) (*crawler, error) { return &crawler{ - log: logp.NewLogger("crawler"), + log: logger.Named("crawler"), inputs: map[uint64]cfgfile.Runner{}, inputsFactory: inputFactory, modulesFactory: module, @@ -79,14 +80,14 @@ func (c *crawler) Start( } if configInputs.Enabled() { - c.inputReloader = cfgfile.NewReloader(pipeline, configInputs) + c.inputReloader = cfgfile.NewReloader(log, pipeline, configInputs) if err := c.inputReloader.Check(c.inputsFactory); err != nil { return fmt.Errorf("creating input reloader failed: %w", err) } } if configModules.Enabled() { - c.modulesReloader = cfgfile.NewReloader(pipeline, configModules) + c.modulesReloader = cfgfile.NewReloader(log, pipeline, configModules) if err := c.modulesReloader.Check(c.modulesFactory); err != nil { return fmt.Errorf("creating module reloader failed: %w", err) } @@ -152,7 +153,7 @@ func (c *crawler) startInput( } func (c *crawler) Stop() { - logp.Info("Stopping Crawler") + c.log.Info("Stopping Crawler") asyncWaitStop := func(stop func()) { c.wg.Add(1) @@ -162,7 +163,7 @@ func (c *crawler) Stop() { }() } - logp.Info("Stopping %d inputs", len(c.inputs)) + c.log.Infof("Stopping %d inputs", len(c.inputs)) // Stop inputs in parallel for id, p := range c.inputs { id, p := id, p @@ -182,7 +183,7 @@ func (c *crawler) Stop() { c.WaitForCompletion() - logp.Info("Crawler stopped") + c.log.Info("Crawler stopped") } func (c *crawler) WaitForCompletion() { diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 5510d576df8..7536730c2c7 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -25,13 +25,11 @@ import ( "strings" "sync" - "github.com/elastic/beats/v7/filebeat/backup" "github.com/elastic/beats/v7/filebeat/channel" cfg "github.com/elastic/beats/v7/filebeat/config" "github.com/elastic/beats/v7/filebeat/fileset" _ "github.com/elastic/beats/v7/filebeat/include" "github.com/elastic/beats/v7/filebeat/input" - "github.com/elastic/beats/v7/filebeat/input/filestream/takeover" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/filebeat/input/v2/compat" "github.com/elastic/beats/v7/filebeat/registrar" @@ -49,7 +47,6 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" - "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/go-concert/unison" // Add filebeat level processors @@ -75,6 +72,7 @@ type Filebeat struct { done chan struct{} stopOnce sync.Once // wraps the Stop() method pipeline beat.PipelineConnector + logger *logp.Logger } type PluginFactory func(beat.Info, *logp.Logger, statestore.States) []v2.Plugin @@ -89,7 +87,7 @@ func New(plugins PluginFactory) beat.Creator { func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Beater, error) { config := cfg.DefaultConfig if err := rawConfig.Unpack(&config); err != nil { - return nil, fmt.Errorf("Error reading config file: %w", err) + return nil, fmt.Errorf("Error reading config file: %w", err) //nolint:staticcheck //Keep old behavior } if err := cfgwarn.CheckRemoved6xSettings( @@ -124,7 +122,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea } if b.API != nil { - if err = inputmon.AttachHandler(b.API.Router(), b.Info.Monitoring.NamespaceRegistry()); err != nil { + if err = inputmon.AttachHandler(b.API.Router(), nil); err != nil { return nil, fmt.Errorf("failed attach inputs api to monitoring endpoint server: %w", err) } } @@ -132,9 +130,9 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea if b.Manager != nil { b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.", "input_metrics.json", "application/json", func() []byte { - data, err := inputmon.MetricSnapshotJSON(b.Info.Monitoring.NamespaceRegistry()) + data, err := inputmon.MetricSnapshotJSON(nil) if err != nil { - logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err) + b.Info.Logger.Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err) return []byte(err.Error()) } return data @@ -163,11 +161,11 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea } // in the `setup` command, log this only as a warning - logp.Warn("Setup called, but no modules enabled.") + b.Info.Logger.Warn("Setup called, but no modules enabled.") } if *once && config.ConfigInput.Enabled() && config.ConfigModules.Enabled() { - return nil, fmt.Errorf("input configs and -once cannot be used together") + return nil, fmt.Errorf("input configs and --once cannot be used together") } if config.IsInputEnabled("stdin") && len(enabledInputs) > 1 { @@ -179,6 +177,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea config: &config, moduleRegistry: moduleRegistry, pluginFactory: plugins, + logger: b.Info.Logger, } err = fb.setupPipelineLoaderCallback(b) @@ -192,7 +191,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea // setupPipelineLoaderCallback sets the callback function for loading pipelines during setup. func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { if b.Config.Output.Name() != "elasticsearch" && !b.Manager.Enabled() { - logp.Warn(pipelinesWarning) + fb.logger.Warn(pipelinesWarning) return nil } @@ -225,7 +224,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { newPath := strings.TrimSuffix(origPath, ".yml") _ = fb.config.ConfigModules.SetString("path", -1, newPath) } - modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules) + modulesLoader := cfgfile.NewReloader(fb.logger.Named("module.reloader"), fb.pipeline, fb.config.ConfigModules) modulesLoader.Load(modulesFactory) } @@ -238,7 +237,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { // setup. func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error { if b.Config.Output.Name() != "elasticsearch" { - logp.Warn(pipelinesWarning) + fb.logger.Warn(pipelinesWarning) return nil } @@ -290,7 +289,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { registryMigrator := registrar.NewMigrator(config.Registry) if err := registryMigrator.Run(); err != nil { - logp.Err("Failed to migrate registry file: %+v", err) + fb.logger.Errorf("Failed to migrate registry file: %+v", err) return err } @@ -301,9 +300,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error { cn() }() - stateStore, err := openStateStore(ctx, b.Info, logp.NewLogger("filebeat"), config.Registry) + stateStore, err := openStateStore(ctx, b.Info, fb.logger.Named("filebeat"), config.Registry) if err != nil { - logp.Err("Failed to open state store: %+v", err) + fb.logger.Errorf("Failed to open state store: %+v", err) return err } defer stateStore.Close() @@ -315,7 +314,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error { outCfg := conf.Namespace{} if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" { - logp.Err("Failed to unpack the output config: %v", err) + fb.logger.Errorf("Failed to unpack the output config: %v", err) return nil } @@ -324,7 +323,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // See https://github.com/elastic/beats/issues/42815 configCopy, err := conf.NewConfigFrom(outCfg.Config()) if err != nil { - logp.Err("Failed to create a new config from the output config: %v", err) + fb.logger.Errorf("Failed to create a new config from the output config: %v", err) return nil } stateStore.notifier.Notify(configCopy) @@ -332,16 +331,10 @@ func (fb *Filebeat) Run(b *beat.Beat) error { }) } - err = processLogInputTakeOver(stateStore, config) - if err != nil { - logp.Err("Failed to attempt filestream state take over: %+v", err) - return err - } - // Setup registrar to persist state registrar, err := registrar.New(stateStore, finishedLogger, config.Registry.FlushTimeout) if err != nil { - logp.Err("Could not init registrar: %v", err) + fb.logger.Errorf("Could not init registrar: %v", err) return err } @@ -366,7 +359,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { outDone := make(chan struct{}) // outDone closes down all active pipeline connections pipelineConnector := channel.NewOutletFactory(outDone).Create - inputsLogger := logp.NewLogger("input") + inputsLogger := fb.logger.Named("input") v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore) v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType) if err != nil { @@ -380,7 +373,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // Store needs to be fully configured at this point if err := v2InputLoader.Init(&inputTaskGroup); err != nil { - logp.Err("Failed to initialize the input managers: %v", err) + fb.logger.Errorf("Failed to initialize the input managers: %v", err) return err } @@ -401,12 +394,12 @@ func (fb *Filebeat) Run(b *beat.Beat) error { if b.Config.Output.Name() == "elasticsearch" { pipelineLoaderFactory = newPipelineLoaderFactory(pipelineFactoryCtx, b.Config.Output.Config()) } else { - logp.Warn(pipelinesWarning) + fb.logger.Warn(pipelinesWarning) } moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines) - crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once) + crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once, fb.logger) if err != nil { - logp.Err("Could not init crawler: %v", err) + fb.logger.Errorf("Could not init crawler: %v", err) return err } @@ -417,7 +410,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // Start the registrar err = registrar.Start() if err != nil { - return fmt.Errorf("Could not start registrar: %w", err) + return fmt.Errorf("Could not start registrar: %w", err) //nolint:staticcheck //Keep old behavior } // Stopping registrar will write last state @@ -435,31 +428,31 @@ func (fb *Filebeat) Run(b *beat.Beat) error { defer waitEvents.Wait() if config.OverwritePipelines { - logp.Debug("modules", "Existing Ingest pipelines will be updated") + fb.logger.Debug("modules", "Existing Ingest pipelines will be updated") } err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules) if err != nil { crawler.Stop() cancelPipelineFactoryCtx() - return fmt.Errorf("Failed to start crawler: %w", err) + return fmt.Errorf("Failed to start crawler: %w", err) //nolint:staticcheck //Keep old behavior } // If run once, add crawler completion check as alternative to done signal if *once { runOnce := func() { - logp.Info("Running filebeat once. Waiting for completion ...") + fb.logger.Info("Running filebeat once. Waiting for completion ...") crawler.WaitForCompletion() - logp.Info("All data collection completed. Shutting down.") + fb.logger.Info("All data collection completed. Shutting down.") } waitFinished.Add(runOnce) } // Register reloadable list of inputs and modules - inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline) + inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline, fb.logger) b.Registry.MustRegisterInput(inputs) - modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline) + modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline, fb.logger) var adiscover *autodiscover.Autodiscover if fb.config.Autodiscover != nil { @@ -473,6 +466,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { autodiscover.QueryConfig(), config.Autodiscover, b.Keystore, + fb.logger, ) if err != nil { return err @@ -508,7 +502,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { "Continue shutdown: All enqueued events being published.")) // Wait for either timeout or all events having been ACKed by outputs. if fb.config.ShutdownTimeout > 0 { - logp.Info("Shutdown output timer started. Waiting for max %v.", timeout) + fb.logger.Info("Shutdown output timer started. Waiting for max %v.", timeout) waitEvents.Add(withLog(waitDuration(timeout), "Continue shutdown: Time out waiting for events being published.")) } else { @@ -534,7 +528,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // Stop is called on exit to stop the crawling, spooling and registration processes. func (fb *Filebeat) Stop() { - logp.Info("Stopping filebeat") + fb.logger.Info("Stopping filebeat") // Stop Filebeat fb.stopOnce.Do(func() { close(fb.done) }) @@ -545,39 +539,13 @@ func newPipelineLoaderFactory(ctx context.Context, esConfig *conf.C) fileset.Pip pipelineLoaderFactory := func() (fileset.PipelineLoader, error) { esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Filebeat") if err != nil { - return nil, fmt.Errorf("Error creating Elasticsearch client: %w", err) + return nil, fmt.Errorf("Error creating Elasticsearch client: %w", err) //nolint:staticcheck //Keep old behavior } return esClient, nil } return pipelineLoaderFactory } -// some of the filestreams might want to take over the loginput state -// if their `take_over` flag is set to `true`. -func processLogInputTakeOver(stateStore statestore.States, config *cfg.Config) error { - inputs, err := fetchInputConfiguration(config) - if err != nil { - return fmt.Errorf("Failed to fetch input configuration when attempting take over: %w", err) - } - if len(inputs) == 0 { - return nil - } - - store, err := stateStore.StoreFor("") - if err != nil { - return fmt.Errorf("Failed to access state when attempting take over: %w", err) - } - defer store.Close() - logger := logp.NewLogger("filestream-takeover") - - registryHome := paths.Resolve(paths.Data, config.Registry.Path) - registryHome = filepath.Join(registryHome, "filebeat") - - backuper := backup.NewRegistryBackuper(logger, registryHome) - - return takeover.TakeOverLogInputStates(logger, store, backuper, inputs) -} - // fetches all the defined input configuration available at Filebeat startup including external files. func fetchInputConfiguration(config *cfg.Config) (inputs []*conf.C, err error) { if len(config.Inputs) == 0 { diff --git a/filebeat/cmd/modules.go b/filebeat/cmd/modules.go index de76e32c6e6..e443bfb1e19 100644 --- a/filebeat/cmd/modules.go +++ b/filebeat/cmd/modules.go @@ -39,7 +39,7 @@ func buildModulesManager(beat *beat.Beat) (cmd.ModulesManager, error) { return nil, fmt.Errorf("wrong settings for config.modules.path, it is expected to end with *.yml. Got: %s", glob) } - modulesManager, err := cfgfile.NewGlobManager(glob, ".yml", ".disabled") + modulesManager, err := cfgfile.NewGlobManager(glob, ".yml", ".disabled", beat.Info.Logger) if err != nil { return nil, fmt.Errorf("initialization error: %w", err) } diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 227b375ee90..5174952dc14 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -184,7 +184,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { } if bt.config.ConfigMonitors.Enabled() { - bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors) + bt.monitorReloader = cfgfile.NewReloader(b.Info.Logger.Named("module.reload"), b.Publisher, bt.config.ConfigMonitors) defer bt.monitorReloader.Stop() err := bt.RunReloadableMonitors() @@ -285,7 +285,7 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) { return nil }) - inputs := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher) + inputs := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher, b.Info.Logger) b.Registry.MustRegisterInput(inputs) } @@ -311,6 +311,7 @@ func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, autodiscover.QueryConfig(), bt.config.Autodiscover, b.Keystore, + b.Info.Logger, ) if err != nil { return nil, err diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 7a5be48fefe..c0b3a63d32d 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -74,8 +74,9 @@ func NewAutodiscover( configurer EventConfigurer, c *Config, keystore keystore.Keystore, + logger *logp.Logger, ) (*Autodiscover, error) { - logger := logp.NewLogger("autodiscover") + logger = logger.Named("autodiscover") // Init Event bus bus := bus.New(logger, name) @@ -97,7 +98,7 @@ func NewAutodiscover( factory: factory, configurer: configurer, configs: map[string]map[uint64]*reload.ConfigWithMeta{}, - runners: cfgfile.NewRunnerList("autodiscover.cfgfile", factory, pipeline), + runners: cfgfile.NewRunnerList("autodiscover.cfgfile", factory, pipeline, logger), providers: providers, meta: meta.NewMap(), logger: logger, diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index 5343c093941..b24b609f215 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -29,6 +29,8 @@ import ( "github.com/gofrs/uuid/v5" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" @@ -37,6 +39,7 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/keystore" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -116,10 +119,14 @@ func (m *mockAdapter) CheckConfig(c *conf.C) error { return nil } +// Create returns a mockRunner with the provided config. If +// the config contains `err_non_reloadable: true`, then a +// common.ErrNonReloadable is returned alongside a nil runner. func (m *mockAdapter) Create(_ beat.PipelineConnector, config *conf.C) (cfgfile.Runner, error) { runner := &mockRunner{ config: config, } + m.mutex.Lock() defer m.mutex.Unlock() m.runners = append(m.runners, runner) @@ -193,7 +200,8 @@ func TestAutodiscover(t *testing.T) { } k, _ := keystore.NewFileKeystore("test") // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k) + logger := logptest.NewTestingLogger(t, "") + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k, logger) if err != nil { t.Fatal(err) } @@ -346,8 +354,9 @@ func TestAutodiscoverHash(t *testing.T) { Providers: []*conf.C{providerConfig}, } k, _ := keystore.NewFileKeystore("test") + logger := logptest.NewTestingLogger(t, "") // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k, logger) if err != nil { t.Fatal(err) } @@ -410,8 +419,9 @@ func TestAutodiscoverDuplicatedConfigConfigCheckCalledOnce(t *testing.T) { Providers: []*conf.C{providerConfig}, } k, _ := keystore.NewFileKeystore("test") + logger := logptest.NewTestingLogger(t, "") // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k, logger) if err != nil { t.Fatal(err) } @@ -478,8 +488,9 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) { Providers: []*conf.C{providerConfig}, } k, _ := keystore.NewFileKeystore("test") + logger := logptest.NewTestingLogger(t, "") // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k, logger) if err != nil { t.Fatal(err) } @@ -537,8 +548,9 @@ func TestAutodiscoverWithMutlipleEntries(t *testing.T) { Providers: []*conf.C{providerConfig}, } k, _ := keystore.NewFileKeystore("test") + logger := logptest.NewTestingLogger(t, "") // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k, logger) if err != nil { t.Fatal(err) } @@ -653,9 +665,9 @@ func TestAutodiscoverDebounce(t *testing.T) { k, _ := keystore.NewFileKeystore("test") adapter := mockAdapter{} - + logger := logptest.NewTestingLogger(t, "") // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k, logger) if err != nil { t.Fatal(err) } @@ -730,15 +742,14 @@ func TestAutodiscoverDebounce(t *testing.T) { } func printDebugLogsOnFailure(t *testing.T) { - // Use the following line to have the logs being printed - // in real time. - // logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*")) - logp.DevelopmentSetup(logp.ToObserverOutput()) + observed, zapLogs := observer.New(zapcore.DebugLevel) + _, err := logp.ConfigureWithCoreLocal(logp.Config{}, observed) + require.NoError(t, err) t.Cleanup(func() { if t.Failed() { t.Logf("Debug Logs:\n") - for _, log := range logp.ObserverLogs().TakeAll() { + for _, log := range zapLogs.TakeAll() { data, err := json.Marshal(log) if err != nil { t.Errorf("failed encoding log as JSON: %s", err) diff --git a/libbeat/cfgfile/glob_manager.go b/libbeat/cfgfile/glob_manager.go index 47aeb505fdb..cc4606e9a8c 100644 --- a/libbeat/cfgfile/glob_manager.go +++ b/libbeat/cfgfile/glob_manager.go @@ -24,6 +24,8 @@ import ( "path/filepath" "sort" "strings" + + "github.com/elastic/elastic-agent-libs/logp" ) // GlobManager allows to manage a directory of conf files. Using a glob pattern @@ -34,6 +36,7 @@ type GlobManager struct { enabledExtension string disabledExtension string files []*CfgFile + logger *logp.Logger } type CfgFile struct { @@ -47,15 +50,16 @@ type CfgFile struct { // - glob - matching conf files (ie: modules.d/*.yml) // - enabledExtension - extension for enabled confs, must match the glob (ie: .yml) // - disabledExtension - extension to append for disabled confs (ie: .disabled) -func NewGlobManager(glob, enabledExtension, disabledExtension string) (*GlobManager, error) { +func NewGlobManager(glob, enabledExtension, disabledExtension string, logger *logp.Logger) (*GlobManager, error) { if !strings.HasSuffix(glob, enabledExtension) { - return nil, errors.New("Glob should have the enabledExtension as suffix") + return nil, errors.New("Glob should have the enabledExtension as suffix") //nolint:staticcheck //Keep old behavior } g := &GlobManager{ glob: glob, enabledExtension: enabledExtension, disabledExtension: disabledExtension, + logger: logger, } if err := g.load(); err != nil { return nil, err @@ -68,7 +72,7 @@ func (g *GlobManager) load() error { g.files = nil // Load enabled - watcher := NewGlobWatcher(g.glob) + watcher := NewGlobWatcher(g.glob, g.logger) files, _, err := watcher.Scan() if err != nil { return err @@ -84,7 +88,7 @@ func (g *GlobManager) load() error { } // Load disabled - watcher = NewGlobWatcher(g.glob + g.disabledExtension) + watcher = NewGlobWatcher(g.glob+g.disabledExtension, g.logger) files, _, err = watcher.Scan() if err != nil { return err diff --git a/libbeat/cfgfile/glob_manager_test.go b/libbeat/cfgfile/glob_manager_test.go index 808f1bc3394..8141a5a8cfe 100644 --- a/libbeat/cfgfile/glob_manager_test.go +++ b/libbeat/cfgfile/glob_manager_test.go @@ -18,43 +18,42 @@ package cfgfile import ( - "io/ioutil" "os" "path/filepath" "testing" "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp/logptest" ) func TestGlobManagerInit(t *testing.T) { // Wrong settings return error - manager, err := NewGlobManager("dir/*.yml", ".noyml", ".disabled") + logger := logptest.NewTestingLogger(t, "") + manager, err := NewGlobManager("dir/*.yml", ".noyml", ".disabled", logger) assert.Error(t, err) assert.Nil(t, manager) } func TestGlobManager(t *testing.T) { // Create random temp directory - dir, err := ioutil.TempDir("", "glob_manager") - defer os.RemoveAll(dir) - if err != nil { - t.Fatal(err) - } + dir := t.TempDir() // Prepare scenario: content := []byte("test\n") - err = ioutil.WriteFile(dir+"/config1.yml", content, 0644) + err := os.WriteFile(dir+"/config1.yml", content, 0644) assert.NoError(t, err) - err = ioutil.WriteFile(dir+"/config2.yml", content, 0644) + err = os.WriteFile(dir+"/config2.yml", content, 0644) assert.NoError(t, err) - err = ioutil.WriteFile(dir+"/config2-alt.yml.disabled", content, 0644) + err = os.WriteFile(dir+"/config2-alt.yml.disabled", content, 0644) assert.NoError(t, err) - err = ioutil.WriteFile(dir+"/config3.yml.disabled", content, 0644) + err = os.WriteFile(dir+"/config3.yml.disabled", content, 0644) assert.NoError(t, err) // Init Glob Manager glob := dir + "/*.yml" - manager, err := NewGlobManager(glob, ".yml", ".disabled") + logger := logptest.NewTestingLogger(t, "") + manager, err := NewGlobManager(glob, ".yml", ".disabled", logger) if err != nil { t.Fatal(err) } diff --git a/libbeat/cfgfile/glob_watcher.go b/libbeat/cfgfile/glob_watcher.go index e86ff13b635..6c86dc62bdb 100644 --- a/libbeat/cfgfile/glob_watcher.go +++ b/libbeat/cfgfile/glob_watcher.go @@ -31,13 +31,15 @@ type GlobWatcher struct { glob string lastScan time.Time lastHash uint64 + logger *logp.Logger } -func NewGlobWatcher(glob string) *GlobWatcher { +func NewGlobWatcher(glob string, logger *logp.Logger) *GlobWatcher { return &GlobWatcher{ lastScan: time.Time{}, lastHash: 0, glob: glob, + logger: logger, } } @@ -63,7 +65,7 @@ func (gw *GlobWatcher) Scan() ([]string, bool, error) { info, err := os.Stat(f) if err != nil { - logp.Err("Error getting stats for file: %s", f) + gw.logger.Errorf("Error getting stats for file: %s", f) continue } diff --git a/libbeat/cfgfile/glob_watcher_test.go b/libbeat/cfgfile/glob_watcher_test.go index b56a3048626..320325baf24 100644 --- a/libbeat/cfgfile/glob_watcher_test.go +++ b/libbeat/cfgfile/glob_watcher_test.go @@ -23,6 +23,8 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp/logptest" ) func TestGlobWatcher(t *testing.T) { @@ -30,7 +32,8 @@ func TestGlobWatcher(t *testing.T) { dir := t.TempDir() glob := dir + "/*.yml" - gcd := NewGlobWatcher(glob) + logger := logptest.NewTestingLogger(t, "") + gcd := NewGlobWatcher(glob, logger) content := []byte("test\n") err := os.WriteFile(dir+"/config1.yml", content, 0644) diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go index d557ffa25c2..e2f31b1f4ad 100644 --- a/libbeat/cfgfile/list.go +++ b/libbeat/cfgfile/list.go @@ -45,12 +45,12 @@ type RunnerList struct { } // NewRunnerList builds and returns a RunnerList -func NewRunnerList(name string, factory RunnerFactory, pipeline beat.PipelineConnector) *RunnerList { +func NewRunnerList(name string, factory RunnerFactory, pipeline beat.PipelineConnector, logger *logp.Logger) *RunnerList { return &RunnerList{ runners: map[uint64]Runner{}, factory: factory, pipeline: pipeline, - logger: logp.NewLogger(name), + logger: logger.Named(name), } } @@ -97,7 +97,7 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error { hash, err := HashConfig(config.Config) if err != nil { r.logger.Errorf("Unable to hash given config: %s", err) - errs = append(errs, fmt.Errorf("Unable to hash given config: %w", err)) + errs = append(errs, fmt.Errorf("Unable to hash given config: %w", err)) //nolint:staticcheck //Keep old behavior continue } diff --git a/libbeat/cfgfile/list_test.go b/libbeat/cfgfile/list_test.go index 2555e77773c..e552c195ae3 100644 --- a/libbeat/cfgfile/list_test.go +++ b/libbeat/cfgfile/list_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/reload" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -115,7 +116,8 @@ func (r *testDiagHandler) Register(_ string, _ string, _ string, _ string, callb func TestDiagnostics(t *testing.T) { factory := &runnerFactory{} - list := NewRunnerList("", factory, nil) + logger := logptest.NewTestingLogger(t, "") + list := NewRunnerList("", factory, nil, logger) cfg := createConfig(1) callback := &testDiagHandler{} cfg.DiagCallback = callback @@ -129,7 +131,8 @@ func TestDiagnostics(t *testing.T) { func TestNewConfigs(t *testing.T) { factory := &runnerFactory{} - list := NewRunnerList("", factory, nil) + logger := logptest.NewTestingLogger(t, "") + list := NewRunnerList("", factory, nil, logger) err := list.Reload([]*reload.ConfigWithMeta{ createConfig(1), @@ -143,7 +146,9 @@ func TestNewConfigs(t *testing.T) { func TestReloadSameConfigs(t *testing.T) { factory := &runnerFactory{} - list := NewRunnerList("", factory, nil) + logger := logptest.NewTestingLogger(t, "") + + list := NewRunnerList("", factory, nil, logger) err := list.Reload([]*reload.ConfigWithMeta{ createConfig(1), @@ -168,7 +173,8 @@ func TestReloadSameConfigs(t *testing.T) { func TestReloadDuplicateConfig(t *testing.T) { factory := &runnerFactory{} - list := NewRunnerList("", factory, nil) + logger := logptest.NewTestingLogger(t, "") + list := NewRunnerList("", factory, nil, logger) err := list.Reload([]*reload.ConfigWithMeta{ createConfig(1), @@ -192,7 +198,8 @@ func TestReloadDuplicateConfig(t *testing.T) { func TestReloadStopConfigs(t *testing.T) { factory := &runnerFactory{} - list := NewRunnerList("", factory, nil) + logger := logptest.NewTestingLogger(t, "") + list := NewRunnerList("", factory, nil, logger) err := list.Reload([]*reload.ConfigWithMeta{ createConfig(1), @@ -214,7 +221,9 @@ func TestReloadStopConfigs(t *testing.T) { func TestReloadStartStopConfigs(t *testing.T) { factory := &runnerFactory{} - list := NewRunnerList("", factory, nil) + logger := logptest.NewTestingLogger(t, "") + + list := NewRunnerList("", factory, nil, logger) err := list.Reload([]*reload.ConfigWithMeta{ createConfig(1), @@ -239,7 +248,9 @@ func TestReloadStartStopConfigs(t *testing.T) { func TestStopAll(t *testing.T) { factory := &runnerFactory{} - list := NewRunnerList("", factory, nil) + logger := logptest.NewTestingLogger(t, "") + + list := NewRunnerList("", factory, nil, logger) err := list.Reload([]*reload.ConfigWithMeta{ createConfig(1), @@ -253,13 +264,15 @@ func TestStopAll(t *testing.T) { assert.Equal(t, len(list.copyRunnerList()), 0) for _, r := range list.runners { - assert.False(t, r.(*runner).stopped) + assert.False(t, r.(*runner).stopped) //nolint:errcheck //false positive } } func TestHas(t *testing.T) { factory := &runnerFactory{} - list := NewRunnerList("", factory, nil) + logger := logptest.NewTestingLogger(t, "") + + list := NewRunnerList("", factory, nil, logger) config := createConfig(1) hash, err := HashConfig(config.Config) diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 930bd56eafd..fa480381a65 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -42,8 +42,6 @@ var ( }, } - debugf = logp.MakeDebug("cfgfile") - // configScans measures how many times the config dir was scanned for // changes, configReloads measures how many times there were changes that // triggered an actual reload. @@ -101,10 +99,11 @@ type Reloader struct { path string done chan struct{} wg sync.WaitGroup + logger *logp.Logger } // NewReloader creates new Reloader instance for the given config -func NewReloader(pipeline beat.PipelineConnector, cfg *config.C) *Reloader { +func NewReloader(logger *logp.Logger, pipeline beat.PipelineConnector, cfg *config.C) *Reloader { conf := DefaultDynamicConfig _ = cfg.Unpack(&conf) @@ -118,6 +117,7 @@ func NewReloader(pipeline beat.PipelineConnector, cfg *config.C) *Reloader { config: conf, path: path, done: make(chan struct{}), + logger: logger, } } @@ -128,8 +128,8 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { return nil } - debugf("Checking module configs from: %s", rl.path) - gw := NewGlobWatcher(rl.path) + rl.logger.Debugf("Checking module configs from: %s", rl.path) + gw := NewGlobWatcher(rl.path, rl.logger) files, _, err := gw.Scan() if err != nil { @@ -142,7 +142,7 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { return fmt.Errorf("loading configs: %w", err) } - debugf("Number of module configs found: %v", len(configs)) + rl.logger.Debugf("Number of module configs found: %v", len(configs)) // Initialize modules for _, c := range configs { @@ -160,9 +160,9 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { // Run runs the reloader func (rl *Reloader) Run(runnerFactory RunnerFactory) { - logp.Info("Config reloader started") + rl.logger.Info("Config reloader started") - list := NewRunnerList("reload", runnerFactory, rl.pipeline) + list := NewRunnerList("reload", runnerFactory, rl.pipeline, rl.logger) rl.wg.Add(1) defer rl.wg.Done() @@ -170,7 +170,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { // Stop all running modules when method finishes defer list.Stop() - gw := NewGlobWatcher(rl.path) + gw := NewGlobWatcher(rl.path, rl.logger) // If reloading is disable, config files should be loaded immediately if !rl.config.Reload.Enabled { @@ -186,18 +186,18 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { for { select { case <-rl.done: - logp.Info("Dynamic config reloader stopped") + rl.logger.Info("Dynamic config reloader stopped") return case <-time.After(rl.config.Reload.Period): - debugf("Scan for new config files") + rl.logger.Debug("Scan for new config files") configScans.Add(1) files, updated, err := gw.Scan() if err != nil { // In most cases of error, updated == false, so will continue // to next iteration below - logp.Err("Error fetching new config files: %v", err) + rl.logger.Errorf("Error fetching new config files: %v", err) } // if there are no changes, skip this reload unless forceReload is set. @@ -209,20 +209,21 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { // Load all config objects configs, _ := rl.loadConfigs(files) - debugf("Number of module configs found: %v", len(configs)) + rl.logger.Debugf("Number of module configs found: %v", len(configs)) err = list.Reload(configs) - // Force reload on the next iteration if and only if this one failed. - // (Any errors are already logged by list.Reload, so we don't need to - // propagate the details further.) + // Force reload on the next iteration if and only if the error + // can be retried. + // Errors are already logged by list.Reload, so we don't need to + // propagate details any further. forceReload = err != nil } // Path loading is enabled but not reloading. Loads files only once and then stops. if !rl.config.Reload.Enabled { - logp.Info("Loading of config files completed.") + rl.logger.Info("Loading of config files completed.") <-rl.done - logp.Info("Dynamic config reloader stopped") + rl.logger.Info("Dynamic config reloader stopped") return } } @@ -230,7 +231,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { // Load loads configuration files once. func (rl *Reloader) Load(runnerFactory RunnerFactory) { - list := NewRunnerList("load", runnerFactory, rl.pipeline) + list := NewRunnerList("load", runnerFactory, rl.pipeline, rl.logger) rl.wg.Add(1) defer rl.wg.Done() @@ -238,25 +239,25 @@ func (rl *Reloader) Load(runnerFactory RunnerFactory) { // Stop all running modules when method finishes defer list.Stop() - gw := NewGlobWatcher(rl.path) + gw := NewGlobWatcher(rl.path, rl.logger) - debugf("Scan for config files") + rl.logger.Debug("Scan for config files") files, _, err := gw.Scan() if err != nil { - logp.Err("Error fetching new config files: %v", err) + rl.logger.Errorf("Error fetching new config files: %v", err) } // Load all config objects configs, _ := rl.loadConfigs(files) - debugf("Number of module configs found: %v", len(configs)) + rl.logger.Debugf("Number of module configs found: %v", len(configs)) if err := list.Reload(configs); err != nil { - logp.Err("Error loading configuration files: %+v", err) + rl.logger.Errorf("Error loading configuration files: %+v", err) return } - logp.Info("Loading of config files completed.") + rl.logger.Info("Loading of config files completed.") } func (rl *Reloader) loadConfigs(files []string) ([]*reload.ConfigWithMeta, error) { @@ -267,7 +268,7 @@ func (rl *Reloader) loadConfigs(files []string) ([]*reload.ConfigWithMeta, error configs, err := LoadList(file) if err != nil { errs = append(errs, err) - logp.Err("Error loading config from file '%s', error %v", file, err) + rl.logger.Errorf("Error loading config from file '%s', error %v", file, err) continue } diff --git a/libbeat/cfgfile/reload_test.go b/libbeat/cfgfile/reload_test.go index f28fbd7033a..8bff09b4ecb 100644 --- a/libbeat/cfgfile/reload_test.go +++ b/libbeat/cfgfile/reload_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -49,7 +50,7 @@ func TestReloader(t *testing.T) { }, }) // config.C{} - reloader := NewReloader(nil, config) + reloader := NewReloader(logptest.NewTestingLogger(t, ""), nil, config) retryCount := 10 go reloader.Run(nil) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 0feeb50b766..e083b317704 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -329,7 +329,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { log := b.Info.Logger.Named("beat") log.Infof("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version) - b.logSystemInfo() + b.logSystemInfo(log) err = b.registerESVersionCheckCallback() if err != nil { @@ -348,7 +348,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { reg = monitoring.Default.NewRegistry("libbeat") } - err = metricreport.SetupMetrics(b.Beat.Info.Logger.Named("metrics"), b.Info.Beat, version.GetDefaultVersion()) + err = metricreport.SetupMetrics(b.Info.Logger.Named("metrics"), b.Info.Beat, version.GetDefaultVersion()) if err != nil { return nil, err } @@ -788,6 +788,14 @@ func (b *Beat) configure(settings Settings) error { return fmt.Errorf("error unpacking config data: %w", err) } + b.Info.Logger, err = configure.LoggingWithTypedOutputsLocal(b.Info.Beat, b.Config.Logging, b.Config.EventLogging, logp.TypeKey, logp.EventType) + if err != nil { + return fmt.Errorf("error initializing logging: %w", err) + } + + // extracting here for ease of use + logger := b.Info.Logger + if err := PromoteOutputQueueSettings(b); err != nil { return fmt.Errorf("could not promote output queue settings: %w", err) } @@ -807,19 +815,11 @@ func (b *Beat) configure(settings Settings) error { return fmt.Errorf("error setting timestamp precision: %w", err) } - b.Info.Logger, err = configure.LoggingWithTypedOutputsLocal(b.Info.Beat, b.Config.Logging, b.Config.EventLogging, logp.TypeKey, logp.EventType) - if err != nil { - return fmt.Errorf("error initializing logging: %w", err) - } - - // extracting here for ease of use - logger := b.Info.Logger - instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version, b.Info.Logger) if err != nil { return err } - b.Beat.Instrumentation = instrumentation + b.Instrumentation = instrumentation // log paths values to help with troubleshooting logger.Infof("%s", paths.Paths.String()) @@ -845,7 +845,7 @@ func (b *Beat) configure(settings Settings) error { if err != nil { // FQDN lookup is "best effort". We log the error, fallback to // the OS-reported hostname, and move on. - logger.Infof("unable to lookup FQDN: %s, using hostname = %s as FQDN", err.Error(), b.Info.Hostname) + logger.Warnf("unable to lookup FQDN: %s, using hostname = %s as FQDN", err.Error(), b.Info.Hostname) b.Info.FQDN = b.Info.Hostname } else { b.Info.FQDN = fqdn @@ -899,7 +899,7 @@ func (b *Beat) configure(settings Settings) error { if imFactory == nil { imFactory = idxmgmt.MakeDefaultSupport(settings.ILM, logger) } - b.IdxSupporter, err = imFactory(logger, b.Beat.Info, b.RawConfig) + b.IdxSupporter, err = imFactory(logger, b.Info, b.RawConfig) if err != nil { return err } @@ -946,7 +946,7 @@ func (b *Beat) LoadMeta(metaPath string) error { FirstStart time.Time `json:"first_start"` } - b.Info.Logger.Debugf("beat", "Beat metadata path: %v", metaPath) + b.Info.Logger.Named("beat").Debugf("Beat metadata path: %v", metaPath) f, err := openRegular(metaPath) if err != nil && !os.IsNotExist(err) { @@ -955,7 +955,7 @@ func (b *Beat) LoadMeta(metaPath string) error { if err == nil { m := meta{} - if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF { //nolint:errorlint // keep old behaviour + if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF { f.Close() return fmt.Errorf("Beat meta file reading error: %w", err) } @@ -1237,7 +1237,6 @@ func (b *Beat) reloadOutputOnCertChange(cfg config.Namespace) error { // Watch for file changes while the Beat is alive go func() { - //nolint:staticcheck // this is an endless function ticker := time.Tick(extendedTLSCfg.Reload.Period) for { @@ -1363,8 +1362,7 @@ func handleError(err error) error { // in debugging. This information includes data about the beat, build, go // runtime, host, and process. If any of the data is not available it will be // omitted. -func (b *Beat) logSystemInfo() { - log := b.Beat.Info.Logger +func (b *Beat) logSystemInfo(log *logp.Logger) { defer log.Recover("An unexpected error occurred while collecting " + "information about the system.") log = log.With(logp.Namespace("system_info")) diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index e4e2fd8b630..33bca319188 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -485,7 +485,7 @@ func TestLogSystemInfo(t *testing.T) { buff.Reset() b.Manager = mockManager{enabled: tc.managed} - b.logSystemInfo() + b.logSystemInfo(log) tc.assertFn(t, buff) } diff --git a/libbeat/cmd/test/output.go b/libbeat/cmd/test/output.go index fdbfbcee4be..dc29c71f571 100644 --- a/libbeat/cmd/test/output.go +++ b/libbeat/cmd/test/output.go @@ -40,7 +40,7 @@ func GenTestOutputCmd(settings instance.Settings) *cobra.Command { os.Exit(1) } - im, _ := idxmgmt.DefaultSupport(b.Info.Logger, b.Info, nil) + im, _ := idxmgmt.DefaultSupport(b.Info, nil) output, err := outputs.Load(im, b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config()) if err != nil { fmt.Fprintf(os.Stderr, "Error initializing output: %s\n", err) @@ -50,7 +50,7 @@ func GenTestOutputCmd(settings instance.Settings) *cobra.Command { for _, client := range output.Clients { tClient, ok := client.(testing.Testable) if !ok { - fmt.Printf("%s output doesn't support testing\n", b.Config.Output.Name()) + fmt.Printf("%s output doesn't support testing\n", b.Config.Output.Name()) //nolint:forbidigo //output to stderr before exiting os.Exit(1) } diff --git a/libbeat/dashboards/dashboards.go b/libbeat/dashboards/dashboards.go index 90004321eca..3284d3a59e5 100644 --- a/libbeat/dashboards/dashboards.go +++ b/libbeat/dashboards/dashboards.go @@ -54,20 +54,20 @@ func ImportDashboards( return errors.New("kibana configuration missing for loading dashboards") } - return setupAndImportDashboardsViaKibana(ctx, beatInfo.Hostname, beatInfo.Beat, kibanaConfig, &dashConfig, msgOutputter, pattern) + return setupAndImportDashboardsViaKibana(ctx, beatInfo, kibanaConfig, &dashConfig, msgOutputter, pattern) } -func setupAndImportDashboardsViaKibana(ctx context.Context, hostname, beatname string, kibanaConfig *config.C, +func setupAndImportDashboardsViaKibana(ctx context.Context, beatInfo beat.Info, kibanaConfig *config.C, dashboardsConfig *Config, msgOutputter MessageOutputter, fields mapstr.M) error { - kibanaLoader, err := NewKibanaLoader(ctx, kibanaConfig, dashboardsConfig, hostname, msgOutputter, beatname) + kibanaLoader, err := NewKibanaLoader(ctx, kibanaConfig, dashboardsConfig, msgOutputter, beatInfo) if err != nil { - return fmt.Errorf("fail to create the Kibana loader: %v", err) + return fmt.Errorf("fail to create the Kibana loader: %w", err) } defer kibanaLoader.Close() - kibanaLoader.statusMsg("Kibana URL %v", kibanaLoader.client.Connection.URL) + kibanaLoader.statusMsg("Kibana URL %v", kibanaLoader.client.URL) return ImportDashboardsViaKibana(kibanaLoader, fields) } @@ -76,16 +76,16 @@ func setupAndImportDashboardsViaKibana(ctx context.Context, hostname, beatname s func ImportDashboardsViaKibana(kibanaLoader *KibanaLoader, fields mapstr.M) error { version := kibanaLoader.version if !version.IsValid() { - return errors.New("No valid kibana version available") + return errors.New("No valid kibana version available") //nolint:staticcheck //Keep old behavior } if !isKibanaAPIavailable(kibanaLoader.version) { - return fmt.Errorf("Kibana API is not available in Kibana version %s", kibanaLoader.version.String()) + return fmt.Errorf("Kibana API is not available in Kibana version %s", kibanaLoader.version.String()) //nolint:staticcheck //Keep old behavior } importer, err := NewImporter(version, kibanaLoader.config, *kibanaLoader, fields) if err != nil { - return fmt.Errorf("fail to create a Kibana importer for loading the dashboards: %v", err) + return fmt.Errorf("fail to create a Kibana importer for loading the dashboards: %w", err) } if err := importer.Import(); err != nil { diff --git a/libbeat/dashboards/kibana_loader.go b/libbeat/dashboards/kibana_loader.go index 3320f996a2a..7826db3f5c9 100644 --- a/libbeat/dashboards/kibana_loader.go +++ b/libbeat/dashboards/kibana_loader.go @@ -28,6 +28,7 @@ import ( "github.com/joeshaw/multierror" + "github.com/elastic/beats/v7/libbeat/beat" beatversion "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/kibana" @@ -48,35 +49,35 @@ var ( // KibanaLoader loads Kibana files type KibanaLoader struct { - client *kibana.Client - config *Config - version version.V - hostname string - msgOutputter MessageOutputter - defaultLogger *logp.Logger + client *kibana.Client + config *Config + version version.V + hostname string + msgOutputter MessageOutputter + logger *logp.Logger loadedAssets map[string]bool } // NewKibanaLoader creates a new loader to load Kibana files -func NewKibanaLoader(ctx context.Context, cfg *config.C, dashboardsConfig *Config, hostname string, msgOutputter MessageOutputter, beatname string) (*KibanaLoader, error) { +func NewKibanaLoader(ctx context.Context, cfg *config.C, dashboardsConfig *Config, msgOutputter MessageOutputter, beatInfo beat.Info) (*KibanaLoader, error) { if cfg == nil || !cfg.Enabled() { return nil, fmt.Errorf("kibana is not configured or enabled") } - client, err := getKibanaClient(ctx, cfg, dashboardsConfig.Retry, 0, beatname) + client, err := getKibanaClient(ctx, cfg, dashboardsConfig.Retry, 0, beatInfo.Beat) if err != nil { return nil, fmt.Errorf("Error creating Kibana client: %w", err) } loader := KibanaLoader{ - client: client, - config: dashboardsConfig, - version: client.GetVersion(), - hostname: hostname, - msgOutputter: msgOutputter, - defaultLogger: logp.NewLogger("dashboards"), - loadedAssets: make(map[string]bool, 0), + client: client, + config: dashboardsConfig, + version: client.GetVersion(), + hostname: beatInfo.Hostname, + msgOutputter: msgOutputter, + logger: beatInfo.Logger.Named("dashboards"), + loadedAssets: make(map[string]bool, 0), } version := client.GetVersion() @@ -104,7 +105,7 @@ func getKibanaClient(ctx context.Context, cfg *config.C, retryCfg *Retry, retryA // ImportIndexFile imports an index pattern from a file func (loader KibanaLoader) ImportIndexFile(file string) error { if loader.version.LessThan(minimumRequiredVersionSavedObjects) { - return fmt.Errorf("Kibana version must be at least %s", minimumRequiredVersionSavedObjects.String()) + return fmt.Errorf("Kibana version must be at least %s", minimumRequiredVersionSavedObjects.String()) //nolint:staticcheck //Keep old behavior } loader.statusMsg("Importing index file from %s", file) @@ -149,7 +150,7 @@ func (loader KibanaLoader) ImportIndex(pattern mapstr.M) error { // ImportDashboard imports the dashboard file func (loader KibanaLoader) ImportDashboard(file string) error { if loader.version.LessThan(minimumRequiredVersionSavedObjects) { - return fmt.Errorf("Kibana version must be at least %s", minimumRequiredVersionSavedObjects.String()) + return fmt.Errorf("Kibana version must be at least %s", minimumRequiredVersionSavedObjects.String()) //nolint:staticcheck //Keep old behavior } loader.statusMsg("Importing dashboard from %s", file) @@ -256,6 +257,6 @@ func (loader KibanaLoader) statusMsg(msg string, a ...interface{}) { if loader.msgOutputter != nil { loader.msgOutputter(msg, a...) } else { - loader.defaultLogger.Debugf(msg, a...) + loader.logger.Debugf(msg, a...) } } diff --git a/libbeat/idxmgmt/client_handler.go b/libbeat/idxmgmt/client_handler.go index ae92f75a36c..b97bd1cc860 100644 --- a/libbeat/idxmgmt/client_handler.go +++ b/libbeat/idxmgmt/client_handler.go @@ -64,7 +64,7 @@ func NewESClientHandler(client ESClient, info beat.Info, cfg lifecycle.RawConfig if err != nil { return nil, fmt.Errorf("error creating ES handler: %w", err) } - loader, err := template.NewESLoader(client, esHandler) + loader, err := template.NewESLoader(client, esHandler, info.Logger) if err != nil { return nil, fmt.Errorf("error creating ES loader: %w", err) } @@ -78,5 +78,5 @@ func NewFileClientHandler(client FileClient, info beat.Info, cfg lifecycle.RawCo if err != nil { return nil, fmt.Errorf("error creating client handler: %w", err) } - return NewClientHandler(mgmt, template.NewFileLoader(client, mgmt.Mode() == lifecycle.DSL)), nil + return NewClientHandler(mgmt, template.NewFileLoader(client, mgmt.Mode() == lifecycle.DSL, info.Logger)), nil } diff --git a/libbeat/idxmgmt/idxmgmt.go b/libbeat/idxmgmt/idxmgmt.go index fa5a547dd3b..399a2b6cb40 100644 --- a/libbeat/idxmgmt/idxmgmt.go +++ b/libbeat/idxmgmt/idxmgmt.go @@ -92,9 +92,9 @@ func (m *LoadMode) Enabled() bool { } // DefaultSupport initializes the default index management support used by most Beats. -func DefaultSupport(log *logp.Logger, info beat.Info, configRoot *config.C) (Supporter, error) { - factory := MakeDefaultSupport(nil, log) - return factory(log, info, configRoot) +func DefaultSupport(info beat.Info, configRoot *config.C) (Supporter, error) { + factory := MakeDefaultSupport(nil, info.Logger) + return factory(info.Logger, info, configRoot) } // MakeDefaultSupport creates some default index management support, with a @@ -123,13 +123,9 @@ func MakeDefaultSupport(ilmSupport lifecycle.SupportFactory, logger *logp.Logger // consider lifecycles enabled if the user has explicitly enabled them, // or if no `enabled` setting has been set by the user, thus reverting to a default of enabled. - enabled := false - if cfg.Lifecycle.DSL.Enabled() || cfg.Lifecycle.ILM.Enabled() { - enabled = true - } - if (cfg.Lifecycle.DSL == nil || !cfg.Lifecycle.DSL.HasField("enabled")) && (cfg.Lifecycle.ILM == nil || !cfg.Lifecycle.ILM.HasField("enabled")) { - enabled = true - } + enabled := cfg.Lifecycle.DSL.Enabled() || cfg.Lifecycle.ILM.Enabled() || + ((cfg.Lifecycle.DSL == nil || !cfg.Lifecycle.DSL.HasField("enabled")) && + (cfg.Lifecycle.ILM == nil || !cfg.Lifecycle.ILM.HasField("enabled"))) if err := checkTemplateESSettings(cfg.Template, cfg.Output); err != nil { return nil, err diff --git a/libbeat/idxmgmt/lifecycle/es_client_handler.go b/libbeat/idxmgmt/lifecycle/es_client_handler.go index 973473449b9..84f2a4ac5cd 100644 --- a/libbeat/idxmgmt/lifecycle/es_client_handler.go +++ b/libbeat/idxmgmt/lifecycle/es_client_handler.go @@ -23,7 +23,6 @@ import ( "net/http" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -95,7 +94,7 @@ func NewESClientHandler(c ESClient, info beat.Info, cfg RawConfig) (*ESClientHan // if the user has set both to different values, throw a warning, as overwrite operations will probably fail if c.IsServerless() { if cfg.TemplateName != "" && cfg.TemplateName != name { - logp.L().Warnf("setup.dsl.data_stream_pattern is %s, but setup.template.name is %s; under serverless, non-default template and DSL pattern names should be the same. Additional updates & overwrites to this config will not work.", name, cfg.TemplateName) + info.Logger.Warnf("setup.dsl.data_stream_pattern is %s, but setup.template.name is %s; under serverless, non-default template and DSL pattern names should be the same. Additional updates & overwrites to this config will not work.", name, cfg.TemplateName) } } diff --git a/libbeat/idxmgmt/lifecycle/file_client_handler.go b/libbeat/idxmgmt/lifecycle/file_client_handler.go index 53e4030944c..699d60ba638 100644 --- a/libbeat/idxmgmt/lifecycle/file_client_handler.go +++ b/libbeat/idxmgmt/lifecycle/file_client_handler.go @@ -21,7 +21,6 @@ import ( "fmt" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -63,7 +62,7 @@ func NewFileClientHandler(c FileClient, info beat.Info, cfg RawConfig) (*FileCli lifecycleCfg = DefaultILMConfig(info).ILM err = cfg.ILM.Unpack(&lifecycleCfg) } else { - logp.L().Infof("No lifecycle config has been explicitly enabled, defauling to ILM") + info.Logger.Infof("No lifecycle config has been explicitly enabled, defauling to ILM") } if err != nil { diff --git a/libbeat/mock/mockbeat.go b/libbeat/mock/mockbeat.go index 701a47dfcab..c67468f8676 100644 --- a/libbeat/mock/mockbeat.go +++ b/libbeat/mock/mockbeat.go @@ -43,7 +43,7 @@ type Mockbeat struct { func New(b *beat.Beat, _ *config.C) (beat.Beater, error) { return &Mockbeat{ done: make(chan struct{}), - logger: logp.NewLogger("mock"), + logger: b.Info.Logger.Named("mock"), }, nil } diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 867316fe3a9..23997e53654 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -72,7 +72,7 @@ func makeConsole( } index := beat.Beat - c, err := newConsole(index, observer, enc) + c, err := newConsole(index, observer, enc, beat.Logger) if err != nil { return outputs.Fail(fmt.Errorf("console output initialization failed with: %w", err)) } @@ -88,8 +88,8 @@ func makeConsole( return outputs.Success(config.Queue, config.BatchSize, 0, nil, c) } -func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) { - c := &console{log: logp.NewLogger("console"), out: os.Stdout, codec: codec, observer: observer, index: index} +func newConsole(index string, observer outputs.Observer, codec codec.Codec, logger *logp.Logger) (*console, error) { + c := &console{log: logger.Named("console"), out: os.Stdout, codec: codec, observer: observer, index: index} c.writer = bufio.NewWriterSize(c.out, 8*1024) return c, nil } diff --git a/libbeat/outputs/console/console_test.go b/libbeat/outputs/console/console_test.go index 8e2ba6d81aa..dcff60f4e95 100644 --- a/libbeat/outputs/console/console_test.go +++ b/libbeat/outputs/console/console_test.go @@ -36,6 +36,8 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/codec/json" "github.com/elastic/beats/v7/libbeat/outputs/outest" "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -113,8 +115,9 @@ func TestConsoleOutput(t *testing.T) { for _, test := range tests { test := test t.Run(test.title, func(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") batch := outest.NewBatch(test.events...) - lines, err := run(test.codec, batch) + lines, err := run(test.codec, logger, batch) assert.NoError(t, err) assert.Equal(t, test.expected, lines) @@ -127,9 +130,9 @@ func TestConsoleOutput(t *testing.T) { } } -func run(codec codec.Codec, batches ...publisher.Batch) (string, error) { +func run(codec codec.Codec, logger *logp.Logger, batches ...publisher.Batch) (string, error) { return withStdout(func() { - c, _ := newConsole("test", outputs.NewNilObserver(), codec) + c, _ := newConsole("test", outputs.NewNilObserver(), codec, logger) for _, b := range batches { c.Publish(context.Background(), b) } diff --git a/libbeat/outputs/discard/discard.go b/libbeat/outputs/discard/discard.go index bfd1a1c1add..cd4f4a132da 100644 --- a/libbeat/outputs/discard/discard.go +++ b/libbeat/outputs/discard/discard.go @@ -44,7 +44,7 @@ func makeDiscard( cfg *config.C, ) (outputs.Group, error) { out := &discardOutput{ - log: logp.NewLogger("discard"), + log: beat.Logger.Named("discard"), beat: beat, observer: observer, } diff --git a/libbeat/outputs/elasticsearch/bulk_test.go b/libbeat/outputs/elasticsearch/bulk_test.go index c56730f64c0..d1bdd335018 100644 --- a/libbeat/outputs/elasticsearch/bulk_test.go +++ b/libbeat/outputs/elasticsearch/bulk_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/stretchr/testify/assert" ) @@ -72,16 +73,19 @@ func TestBulkReadToItems(t *testing.T) { func TestBulkReadItemStatus(t *testing.T) { response := []byte(`{"create": {"status": 200}}`) + logger := logptest.NewTestingLogger(t, "") reader := newJSONReader(response) - code, _, err := bulkReadItemStatus(logp.L(), reader) + code, _, err := bulkReadItemStatus(logger, reader) assert.NoError(t, err) assert.Equal(t, 200, code) } func TestESNoErrorStatus(t *testing.T) { response := []byte(`{"create": {"status": 200}}`) - code, msg, err := readStatusItem(response) + logger := logptest.NewTestingLogger(t, "") + + code, msg, err := readStatusItem(response, logger) assert.NoError(t, err) assert.Equal(t, 200, code) @@ -90,7 +94,9 @@ func TestESNoErrorStatus(t *testing.T) { func TestES1StyleErrorStatus(t *testing.T) { response := []byte(`{"create": {"status": 400, "error": "test error"}}`) - code, msg, err := readStatusItem(response) + logger := logptest.NewTestingLogger(t, "") + + code, msg, err := readStatusItem(response, logger) assert.NoError(t, err) assert.Equal(t, 400, code) @@ -99,7 +105,9 @@ func TestES1StyleErrorStatus(t *testing.T) { func TestES2StyleErrorStatus(t *testing.T) { response := []byte(`{"create": {"status": 400, "error": {"reason": "test_error"}}}`) - code, msg, err := readStatusItem(response) + logger := logptest.NewTestingLogger(t, "") + + code, msg, err := readStatusItem(response, logger) assert.NoError(t, err) assert.Equal(t, 400, code) @@ -118,14 +126,15 @@ func TestES2StyleExtendedErrorStatus(t *testing.T) { } } }`) - code, _, err := readStatusItem(response) + logger := logptest.NewTestingLogger(t, "") + code, _, err := readStatusItem(response, logger) assert.NoError(t, err) assert.Equal(t, 400, code) } -func readStatusItem(in []byte) (int, string, error) { +func readStatusItem(in []byte, logger *logp.Logger) (int, string, error) { reader := newJSONReader(in) - code, msg, err := bulkReadItemStatus(logp.L(), reader) + code, msg, err := bulkReadItemStatus(logger, reader) return code, string(msg), err } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 56f28cdbf30..c39a7c66350 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -43,7 +43,7 @@ import ( var ( errPayloadTooLarge = errors.New("the bulk payload is too large for the server. Consider to adjust `http.max_content_length` parameter in Elasticsearch or `bulk_max_size` in the beat. The batch has been dropped") - ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true.") + ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true") //nolint:staticcheck //false positive ) // Client is an elasticsearch client. @@ -122,6 +122,7 @@ var bulkRequestParams = map[string]string{ func NewClient( s clientSettings, onConnect *callbacksRegistry, + logger *logp.Logger, ) (*Client, error) { pipeline := s.pipelineSelector if pipeline != nil && pipeline.IsEmpty() { @@ -164,7 +165,7 @@ func NewClient( observer = outputs.NewNilObserver() } - log := logp.NewLogger("elasticsearch") + log := logger.Named("elasticsearch") pLogDeadLetter := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) { @@ -236,6 +237,7 @@ func (client *Client) Clone() *Client { deadLetterIndex: client.deadLetterIndex, }, nil, // XXX: do not pass connection callback? + client.log, ) return c } @@ -351,7 +353,7 @@ func (client *Client) bulkEncodePublishRequest(version version.V, data []publish client.log.Error("Elasticsearch output received unencoded publisher.Event") continue } - event := data[i].EncodedEvent.(*encodedEvent) + event := data[i].EncodedEvent.(*encodedEvent) //nolint:errcheck //safe to ignore type check if event.err != nil { // This means there was an error when encoding the event and it isn't // ingestable, so report the error and continue. @@ -477,7 +479,7 @@ func (client *Client) applyItemStatus( itemMessage []byte, stats *bulkResultStats, ) bool { - encodedEvent := event.EncodedEvent.(*encodedEvent) + encodedEvent := event.EncodedEvent.(*encodedEvent) //nolint:errcheck //safe to ignore type check if itemStatus < 300 { if encodedEvent.deadLetter { // This was ingested into the dead letter index, not the original target diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 7903925c77b..cee1cbab498 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -22,7 +22,7 @@ package elasticsearch import ( "context" "fmt" - "math/rand" + "math/rand/v2" "testing" "time" @@ -37,7 +37,6 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outest" conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" @@ -119,8 +118,6 @@ func testPublishEvent(t *testing.T, index string, cfg map[string]interface{}) { func TestClientPublishEventWithPipeline(t *testing.T) { type obj map[string]interface{} - logp.TestingSetup(logp.WithSelectors("elasticsearch")) - index := "beat-int-pub-single-with-pipeline" pipeline := "beat-int-pub-single-pipeline" @@ -202,8 +199,6 @@ func TestClientPublishEventWithPipeline(t *testing.T) { func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { type obj map[string]interface{} - logp.TestingSetup(logp.WithSelectors("elasticsearch")) - index := "beat-int-test-dli-index" deadletterIndex := "beat-int-test-dli-dead-letter-index" @@ -264,8 +259,6 @@ func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { func TestClientBulkPublishEventsWithPipeline(t *testing.T) { type obj map[string]interface{} - logp.TestingSetup(logp.WithSelectors("elasticsearch")) - index := "beat-int-pub-bulk-with-pipeline" pipeline := "beat-int-pub-bulk-pipeline" @@ -416,10 +409,10 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu t.Fatal(err) } - info := beat.Info{Beat: "libbeat"} + logger := logptest.NewTestingLogger(t, "elasticsearch") + info := beat.Info{Beat: "libbeat", Logger: logger} // disable ILM if using specified index name - logger := logptest.NewTestingLogger(t, "") - im, _ := idxmgmt.DefaultSupport(logger, info, conf.MustNewConfigFrom(map[string]interface{}{"setup.ilm.enabled": "false"})) + im, _ := idxmgmt.DefaultSupport(info, conf.MustNewConfigFrom(map[string]interface{}{"setup.ilm.enabled": "false"})) output, err := makeES(im, info, stats, config) if err != nil { t.Fatal(err) @@ -429,7 +422,8 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu outputs.NetworkClient Client() outputs.NetworkClient } - client := randomClient(output).(clientWrap).Client().(*Client) + client, ok := randomClient(output).(clientWrap).Client().(*Client) + assert.True(t, ok) // Load version ctx ctx, cancel := context.WithCancel(context.Background()) @@ -474,6 +468,6 @@ func randomClient(grp outputs.Group) outputs.NetworkClient { panic("no elasticsearch client") } - client := grp.Clients[rand.Intn(L)] - return client.(outputs.NetworkClient) + client := grp.Clients[rand.IntN(L)] + return client.(outputs.NetworkClient) //nolint:errcheck //This is a test file, can ignore } diff --git a/libbeat/outputs/elasticsearch/client_proxy_test.go b/libbeat/outputs/elasticsearch/client_proxy_test.go index b0cef282487..a948f92e3d0 100644 --- a/libbeat/outputs/elasticsearch/client_proxy_test.go +++ b/libbeat/outputs/elasticsearch/client_proxy_test.go @@ -37,6 +37,7 @@ import ( "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/outputs/outil" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) @@ -207,7 +208,8 @@ func doClientPing(t *testing.T) { clientSettings.connection.Transport.Proxy.URL = &proxyURL } - client, err := NewClient(clientSettings, nil) + logger := logptest.NewTestingLogger(t, "") + client, err := NewClient(clientSettings, nil, logger) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 63d1c095f54..3c2f69f4a25 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -87,6 +87,7 @@ func (bm *batchMock) RetryEvents(events []publisher.Event) { func TestPublish(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") makePublishTestClient := func(t *testing.T, url string) (*Client, *monitoring.Registry) { reg := monitoring.NewRegistry() client, err := NewClient( @@ -96,6 +97,7 @@ func TestPublish(t *testing.T) { indexSelector: testIndexSelector{}, }, nil, + logger, ) require.NoError(t, err) return client, reg @@ -288,11 +290,13 @@ func assertRegistryUint(t *testing.T, reg *monitoring.Registry, key string, expe } func TestCollectPublishFailsNone(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), }, nil, + logger, ) assert.NoError(t, err) @@ -315,11 +319,13 @@ func TestCollectPublishFailsNone(t *testing.T) { } func TestCollectPublishFailMiddle(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), }, nil, + logger, ) assert.NoError(t, err) @@ -350,12 +356,14 @@ func TestCollectPublishFailMiddle(t *testing.T) { func TestCollectPublishFailDeadLetterSuccess(t *testing.T) { const deadLetterIndex = "test_index" + logger := logptest.NewTestingLogger(t, "") client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), deadLetterIndex: deadLetterIndex, }, nil, + logger, ) assert.NoError(t, err) @@ -382,12 +390,14 @@ func TestCollectPublishFailFatalErrorNotRetried(t *testing.T) { // Test that a fatal error sending to the dead letter index is reported as // a dropped event, and is not retried forever const deadLetterIndex = "test_index" + logger := logptest.NewTestingLogger(t, "") client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), deadLetterIndex: deadLetterIndex, }, nil, + logger, ) assert.NoError(t, err) @@ -411,9 +421,11 @@ func TestCollectPublishFailFatalErrorNotRetried(t *testing.T) { } func TestCollectPublishFailInvalidBulkIndexResponse(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") client, err := NewClient( clientSettings{observer: outputs.NewNilObserver()}, nil, + logger, ) assert.NoError(t, err) @@ -440,6 +452,7 @@ func TestCollectPublishFailInvalidBulkIndexResponse(t *testing.T) { } func TestCollectPublishFailDeadLetterIndex(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") const deadLetterIndex = "test_index" client, err := NewClient( clientSettings{ @@ -447,6 +460,7 @@ func TestCollectPublishFailDeadLetterIndex(t *testing.T) { deadLetterIndex: deadLetterIndex, }, nil, + logger, ) assert.NoError(t, err) @@ -488,12 +502,14 @@ func TestCollectPublishFailDeadLetterIndex(t *testing.T) { } func TestCollectPublishFailDrop(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), deadLetterIndex: "", }, nil, + logger, ) assert.NoError(t, err) @@ -536,11 +552,13 @@ func TestCollectPublishFailDrop(t *testing.T) { } func TestCollectPublishFailAll(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), }, nil, + logger, ) assert.NoError(t, err) @@ -566,13 +584,14 @@ func TestCollectPublishFailAll(t *testing.T) { } func TestCollectPipelinePublishFail(t *testing.T) { - logp.TestingSetup(logp.WithSelectors("elasticsearch")) + logger := logptest.NewTestingLogger(t, "") client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), }, nil, + logger, ) assert.NoError(t, err) @@ -618,12 +637,16 @@ func TestCollectPipelinePublishFail(t *testing.T) { } func BenchmarkCollectPublishFailsNone(b *testing.B) { + logger, err := logp.NewDevelopmentLogger("") + require.NoError(b, err) + client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), deadLetterIndex: "", }, nil, + logger, ) assert.NoError(b, err) @@ -651,11 +674,14 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { } func BenchmarkCollectPublishFailMiddle(b *testing.B) { + logger, err := logp.NewDevelopmentLogger("") + require.NoError(b, err) client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), }, nil, + logger, ) assert.NoError(b, err) @@ -684,11 +710,14 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { } func BenchmarkCollectPublishFailAll(b *testing.B) { + logger, err := logp.NewDevelopmentLogger("") + require.NoError(b, err) client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), }, nil, + logger, ) assert.NoError(b, err) @@ -761,6 +790,8 @@ func BenchmarkPublish(b *testing.B) { // Indexing to _bulk api for _, test := range tests { for _, l := range levels { + logger, err := logp.NewDevelopmentLogger("") + require.NoError(b, err) b.Run(fmt.Sprintf("%s with compression level %d", test.Name, l), func(b *testing.B) { client, err := NewClient( clientSettings{ @@ -773,8 +804,8 @@ func BenchmarkPublish(b *testing.B) { CompressionLevel: l, }, }, - nil, + logger, ) assert.NoError(b, err) batch := encodeBatch(client, outest.NewBatch(test.Events...)) @@ -814,6 +845,7 @@ func TestClientWithHeaders(t *testing.T) { })) defer ts.Close() + logger := logptest.NewTestingLogger(t, "") client, err := NewClient(clientSettings{ observer: outputs.NewNilObserver(), connection: eslegclient.ConnectionSettings{ @@ -824,7 +856,7 @@ func TestClientWithHeaders(t *testing.T) { }, }, indexSelector: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)), - }, nil) + }, nil, logger) assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -876,9 +908,10 @@ func TestBulkEncodeEvents(t *testing.T) { info := beat.Info{ IndexPrefix: "test", Version: test.version, + Logger: logger, } - im, err := idxmgmt.DefaultSupport(logger, info, c.NewConfig()) + im, err := idxmgmt.DefaultSupport(info, c.NewConfig()) require.NoError(t, err) index, pipeline, err := buildSelectors(im, info, cfg) @@ -891,6 +924,7 @@ func TestBulkEncodeEvents(t *testing.T) { pipelineSelector: pipeline, }, nil, + logger, ) assert.NoError(t, err) @@ -941,13 +975,14 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { } cfg := c.MustNewConfigFrom(mapstr.M{}) + logger := logptest.NewTestingLogger(t, "") info := beat.Info{ IndexPrefix: "test", Version: version.GetDefaultVersion(), + Logger: logger, } - logger := logptest.NewTestingLogger(t, "") - im, err := idxmgmt.DefaultSupport(logger, info, c.NewConfig()) + im, err := idxmgmt.DefaultSupport(info, c.NewConfig()) require.NoError(t, err) index, pipeline, err := buildSelectors(im, info, cfg) @@ -960,6 +995,7 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { pipelineSelector: pipeline, }, nil, + logger, ) events := make([]publisher.Event, len(cases)) @@ -1017,13 +1053,15 @@ func TestClientWithAPIKey(t *testing.T) { })) defer ts.Close() + logger := logptest.NewTestingLogger(t, "") + client, err := NewClient(clientSettings{ observer: outputs.NewNilObserver(), connection: eslegclient.ConnectionSettings{ URL: ts.URL, APIKey: "hyokHG4BfWk5viKZ172X:o45JUkyuS--yiSAuuxl8Uw", }, - }, nil) + }, nil, logger) assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -1037,6 +1075,8 @@ func TestClientWithAPIKey(t *testing.T) { } func TestBulkRequestHasFilterPath(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") + makePublishTestClient := func(t *testing.T, url string, configParams map[string]string) *Client { client, err := NewClient( clientSettings{ @@ -1048,6 +1088,7 @@ func TestBulkRequestHasFilterPath(t *testing.T) { indexSelector: testIndexSelector{}, }, nil, + logger, ) require.NoError(t, err) return client diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 661aa792a4f..f7727b57c9b 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -24,7 +24,6 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" ) func init() { @@ -39,7 +38,7 @@ func makeES( observer outputs.Observer, cfg *config.C, ) (outputs.Group, error) { - log := logp.NewLogger(logSelector) + log := beatInfo.Logger.Named(logSelector) esConfig := defaultConfig indexSelector, pipelineSelector, err := buildSelectors(im, beatInfo, cfg) if err != nil { @@ -121,7 +120,7 @@ func makeES( pipelineSelector: pipelineSelector, observer: observer, deadLetterIndex: deadLetterIndex, - }, &connectCallbackRegistry) + }, &connectCallbackRegistry, log) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index f650ff3f964..ec14239d641 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -59,7 +59,7 @@ func makeFileout( } fo := &fileOutput{ - log: logp.NewLogger("file"), + log: beat.Logger.Named("file"), beat: beat, observer: observer, } @@ -91,7 +91,7 @@ func (out *fileOutput) init(beat beat.Info, c fileOutConfig) error { file.MaxBackups(c.NumberOfFiles), file.Permissions(os.FileMode(c.Permissions)), file.RotateOnStartup(c.RotateOnStartup), - file.WithLogger(logp.NewLogger("rotator").With(logp.Namespace("rotator"))), + file.WithLogger(beat.Logger.Named("rotator").With(logp.Namespace("rotator"))), ) if err != nil { return err diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 414d82a597d..9944c587867 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -96,9 +96,10 @@ func newKafkaClient( headers []header, writer codec.Codec, cfg *sarama.Config, + logger *logp.Logger, ) (*client, error) { c := &client{ - log: logp.NewLogger(logSelector), + log: logger.Named(logSelector), observer: observer, hosts: hosts, topic: topic, @@ -173,7 +174,7 @@ func (c *client) Publish(_ context.Context, batch publisher.Batch) error { ref := &msgRef{ client: c, - count: int32(len(events)), + count: int32(len(events)), //nolint:gosec //keep old behavior total: len(events), failed: nil, batch: batch, diff --git a/libbeat/outputs/kafka/config_test.go b/libbeat/outputs/kafka/config_test.go index c93aeb45b2c..b850dcd7b62 100644 --- a/libbeat/outputs/kafka/config_test.go +++ b/libbeat/outputs/kafka/config_test.go @@ -27,7 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/internal/testutil" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -49,6 +49,7 @@ func TestConfigAcceptValid(t *testing.T) { test := test t.Run(name, func(t *testing.T) { c := config.MustNewConfigFrom(test) + logger := logptest.NewTestingLogger(t, "") if err := c.SetString("hosts", 0, "localhost"); err != nil { t.Fatalf("could not set 'hosts' on config: %s", err) } @@ -56,7 +57,7 @@ func TestConfigAcceptValid(t *testing.T) { if err != nil { t.Fatalf("Can not create test configuration: %v", err) } - if _, err := newSaramaConfig(logp.L(), cfg); err != nil { + if _, err := newSaramaConfig(logger, cfg); err != nil { t.Fatalf("Failure creating sarama config: %v", err) } }) diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 02dadafd972..21c23421aab 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -45,7 +45,7 @@ func makeKafka( observer outputs.Observer, cfg *config.C, ) (outputs.Group, error) { - log := logp.NewLogger(logSelector) + log := beat.Logger.Named(logSelector) log.Debug("initialize kafka output") kConfig, err := readConfig(cfg) @@ -73,7 +73,7 @@ func makeKafka( return outputs.Fail(err) } - client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, kConfig.Key, topic, kConfig.Headers, codec, libCfg) + client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, kConfig.Key, topic, kConfig.Headers, codec, libCfg, beat.Logger) if err != nil { return outputs.Fail(err) } @@ -93,7 +93,7 @@ func makeKafka( func buildTopicSelector(cfg *config.C) (outil.Selector, error) { if cfg == nil { - return outil.Selector{}, fmt.Errorf("Kafka config cannot be nil") + return outil.Selector{}, fmt.Errorf("Kafka config cannot be nil") //nolint:staticcheck //Keep old behavior } return outil.BuildSelectorFromConfig(cfg, outil.Settings{ diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 18b75f1c689..dbf55cf5b40 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -41,7 +41,7 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/codec/json" "github.com/elastic/beats/v7/libbeat/outputs/outest" "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -56,7 +56,6 @@ type eventInfo struct { } func TestKafkaPublish(t *testing.T) { - logp.TestingSetup(logp.WithSelectors("kafka")) id := strconv.Itoa(rand.Int()) testTopic := fmt.Sprintf("test-libbeat-%s", id) @@ -274,7 +273,8 @@ func TestKafkaPublish(t *testing.T) { } t.Run(name, func(t *testing.T) { - grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, outputs.NewNilObserver(), cfg) + logger := logptest.NewTestingLogger(t, "") + grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat", Logger: logger}, outputs.NewNilObserver(), cfg) if err != nil { t.Fatal(err) } @@ -319,7 +319,7 @@ func TestKafkaPublish(t *testing.T) { validate := validateJSON if fmt, exists := test.config["codec.format.string"]; exists { - validate = makeValidateFmtStr(fmt.(string)) + validate = makeValidateFmtStr(fmt.(string)) //nolint:errcheck //This is a test file } cfgHeaders, headersSet := test.config["headers"] diff --git a/libbeat/outputs/kafka/partition_test.go b/libbeat/outputs/kafka/partition_test.go index 0a35c2bfdf0..10400a38722 100644 --- a/libbeat/outputs/kafka/partition_test.go +++ b/libbeat/outputs/kafka/partition_test.go @@ -33,7 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -200,7 +200,8 @@ func TestPartitioners(t *testing.T) { continue } - constr, err := makePartitioner(logp.L(), pcfg.Partition) + logger := logptest.NewTestingLogger(t, "") + constr, err := makePartitioner(logger, pcfg.Partition) if err != nil { t.Error(err) continue diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 4abef08d1df..07510092a8c 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -63,7 +63,7 @@ func newAsyncClient( config *Config, ) (*asyncClient, error) { - log := logp.NewLogger("logstash") + log := beat.Logger.Named("logstash") c := &asyncClient{ log: log, Client: conn, @@ -92,7 +92,7 @@ func newAsyncClient( } c.connect = func() error { - err := c.Client.ConnectContext(context.Background()) + err := c.ConnectContext(context.Background()) if err == nil { c.client, err = clientFactory(c.Client) } diff --git a/libbeat/outputs/logstash/async_test.go b/libbeat/outputs/logstash/async_test.go index 12d2edd124c..ccd03d3ebde 100644 --- a/libbeat/outputs/logstash/async_test.go +++ b/libbeat/outputs/logstash/async_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outest" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport" ) @@ -54,7 +55,11 @@ func makeAsyncTestClient(conn *transport.Client) testClientDriver { config := defaultConfig() config.Timeout = 1 * time.Second config.Pipelining = 3 - client, err := newAsyncClient(beat.Info{}, conn, outputs.NewNilObserver(), &config) + logger, err := logp.NewDevelopmentLogger("") + if err != nil { + panic(err) + } + client, err := newAsyncClient(beat.Info{Logger: logger}, conn, outputs.NewNilObserver(), &config) if err != nil { panic(err) } diff --git a/libbeat/outputs/logstash/client_test.go b/libbeat/outputs/logstash/client_test.go index 0b48fb07fb6..c39bf1fefa6 100644 --- a/libbeat/outputs/logstash/client_test.go +++ b/libbeat/outputs/logstash/client_test.go @@ -64,7 +64,6 @@ type testDriverCommand struct { const testMaxWindowSize = 64 func testSendZero(t *testing.T, factory clientFactory) { - enableLogging([]string{"*"}) server := transptest.NewMockServerTCP(t, 1*time.Second, "", nil) defer server.Close() @@ -91,7 +90,6 @@ func testSendZero(t *testing.T, factory clientFactory) { } func testSimpleEvent(t *testing.T, factory clientFactory) { - enableLogging([]string{"*"}) mock := transptest.NewMockServerTCP(t, 1*time.Second, "", nil) server, _ := v2.NewWithListener(mock.Listener) defer server.Close() @@ -125,7 +123,6 @@ func testSimpleEvent(t *testing.T, factory clientFactory) { } func testSimpleEventWithTTL(t *testing.T, factory clientFactory) { - enableLogging([]string{"*"}) mock := transptest.NewMockServerTCP(t, 1*time.Second, "", nil) server, _ := v2.NewWithListener(mock.Listener) defer server.Close() @@ -177,7 +174,6 @@ func testSimpleEventWithTTL(t *testing.T, factory clientFactory) { } func testStructuredEvent(t *testing.T, factory clientFactory) { - enableLogging([]string{"*"}) mock := transptest.NewMockServerTCP(t, 1*time.Second, "", nil) server, _ := v2.NewWithListener(mock.Listener) defer server.Close() diff --git a/libbeat/outputs/logstash/common_test.go b/libbeat/outputs/logstash/common_test.go deleted file mode 100644 index 896df2ab734..00000000000 --- a/libbeat/outputs/logstash/common_test.go +++ /dev/null @@ -1,26 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package logstash - -import ( - "github.com/elastic/elastic-agent-libs/logp" -) - -func enableLogging(selectors []string) { - logp.TestingSetup(logp.WithSelectors(selectors...)) -} diff --git a/libbeat/outputs/logstash/deadlock_test.go b/libbeat/outputs/logstash/deadlock_test.go index 137a261859e..7537dcfdb4a 100644 --- a/libbeat/outputs/logstash/deadlock_test.go +++ b/libbeat/outputs/logstash/deadlock_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" ) func TestDeadlockListener(t *testing.T) { @@ -32,7 +32,8 @@ func TestDeadlockListener(t *testing.T) { var currentTime time.Time getTime := func() time.Time { return currentTime } - dl := idleDeadlockListener(logp.NewLogger("test"), timeout, getTime) + logger := logptest.NewTestingLogger(t, "") + dl := idleDeadlockListener(logger, timeout, getTime) // Channels get a buffer so we can trigger them deterministically in // one goroutine. diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index f368a43a9a6..66c3c1fc571 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/fmtstr" @@ -71,11 +72,6 @@ type esSource interface { RefreshIndex() } -type esValueReader interface { - esSource - Read() ([]map[string]interface{}, error) -} - type esCountReader interface { esSource Count() (int, error) @@ -193,8 +189,8 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { }) logger := logptest.NewTestingLogger(t, "") - info := beat.Info{Beat: "libbeat"} - im, err := idxmgmt.DefaultSupport(logger, info, conf.MustNewConfigFrom( + info := beat.Info{Beat: "libbeat", Logger: logger} + im, err := idxmgmt.DefaultSupport(info, conf.MustNewConfigFrom( map[string]interface{}{ "setup.ilm.enabled": false, }, @@ -209,15 +205,15 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { } es := &testOutputer{} - es.NetworkClient = grp.Clients[0].(outputs.NetworkClient) + es.NetworkClient = grp.Clients[0].(outputs.NetworkClient) //nolint:errcheck //safe to ignore in tests es.esConnection = connection // The Elasticsearch output requires events to be encoded // before calling Publish, so create an event encoder. es.encoder = grp.EncoderFactory() ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - es.Connect(ctx) - + err = es.Connect(ctx) + require.NoError(t, err) return es } @@ -243,14 +239,17 @@ func (es *esConnection) Read() ([]map[string]interface{}, error) { hits := make([]map[string]interface{}, len(resp.Hits.Hits)) for i, hit := range resp.Hits.Hits { - json.Unmarshal(hit, &hits[i]) + json.Unmarshal(hit, &hits[i]) //nolint:errcheck //This is a test file, can ignore } return hits, err } func (es *esConnection) RefreshIndex() { - es.Refresh(es.index) + _, _, err := es.Refresh(es.index) + if err != nil { + es.t.Errorf("Failed to refresh: %s", err) + } } func (es *esConnection) Count() (int, error) { @@ -288,17 +287,6 @@ func checkIndex(reader esCountReader, minValues int) func() bool { } } -func checkAll(checks ...func() bool) func() bool { - return func() bool { - for _, check := range checks { - if !check() { - return false - } - } - return true - } -} - func TestSendMessageViaLogstashTCP(t *testing.T) { testSendMessageViaLogstash(t, "basic-tcp", false) } @@ -308,7 +296,6 @@ func TestSendMessageViaLogstashTLS(t *testing.T) { } func testSendMessageViaLogstash(t *testing.T, name string, tls bool) { - enableLogging([]string{"*"}) ls := newTestLogstashOutput(t, name, tls) defer ls.Cleanup() @@ -322,7 +309,8 @@ func testSendMessageViaLogstash(t *testing.T, name string, tls bool) { }, }, ) - ls.Publish(context.Background(), batch) + err := ls.Publish(context.Background(), batch) + require.NoError(t, err) // wait for logstash event flush + elasticsearch waitUntilTrue(5*time.Second, checkIndex(ls, 1)) @@ -505,9 +493,6 @@ func TestLogstashElasticOutputPluginBulkCompatibleMessageTLS(t *testing.T) { } func testLogstashElasticOutputPluginBulkCompatibleMessage(t *testing.T, name string, tls bool) { - if testing.Verbose() { - enableLogging([]string{"*"}) - } timeout := 10 * time.Second @@ -556,8 +541,10 @@ func testLogstashElasticOutputPluginBulkCompatibleMessage(t *testing.T, name str } func checkEvent(t *testing.T, ls, es map[string]interface{}) { - lsEvent := ls["_source"].(map[string]interface{}) - esEvent := es["_source"].(map[string]interface{}) + lsEvent, ok := ls["_source"].(map[string]interface{}) + assert.True(t, ok) + esEvent, ok := es["_source"].(map[string]interface{}) + assert.True(t, ok) commonFields := []string{"@timestamp", "host", "type", "message"} for _, field := range commonFields { assert.NotNil(t, lsEvent[field]) @@ -568,7 +555,7 @@ func checkEvent(t *testing.T, ls, es map[string]interface{}) { func (t *testOutputer) PublishEvent(event beat.Event) { batch := encodeBatch[*outest.Batch](t.encoder, outest.NewBatch(event)) - t.Publish(context.Background(), batch) + t.Publish(context.Background(), batch) //nolint:errcheck //This is a test file } func (t *testOutputer) BulkPublish(events []beat.Event) bool { @@ -582,7 +569,7 @@ func (t *testOutputer) BulkPublish(events []beat.Event) bool { wg.Done() } - t.Publish(context.Background(), batch) + t.Publish(context.Background(), batch) //nolint:errcheck //This is a test file wg.Wait() return ok } @@ -603,7 +590,7 @@ func encodeEvents(encoder queue.Encoder, events []publisher.Event) []publisher.E // Skip encoding if there's already encoded data present if events[i].EncodedEvent == nil { encoded, _ := encoder.EncodeEntry(events[i]) - event := encoded.(publisher.Event) + event := encoded.(publisher.Event) //nolint:errcheck //This is a test file, can ignore events[i] = event } } diff --git a/libbeat/outputs/logstash/logstash_test.go b/libbeat/outputs/logstash/logstash_test.go index 5be2054cf2a..dcd615c1b03 100644 --- a/libbeat/outputs/logstash/logstash_test.go +++ b/libbeat/outputs/logstash/logstash_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -32,6 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outest" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" v2 "github.com/elastic/go-lumber/server/v2" ) @@ -42,7 +44,6 @@ const ( ) func TestLogstashTCP(t *testing.T) { - enableLogging([]string{"*"}) timeout := 2 * time.Second server := transptest.NewMockServerTCP(t, timeout, "", nil) @@ -56,12 +57,12 @@ func TestLogstashTCP(t *testing.T) { } func TestLogstashTLS(t *testing.T) { - enableLogging([]string{"*"}) certName := "ca_test" timeout := 2 * time.Second - transptest.GenCertForTestingPurpose(t, certName, "", "127.0.0.1", "127.0.1.1") + err := transptest.GenCertForTestingPurpose(t, certName, "", "127.0.0.1", "127.0.1.1") + require.NoError(t, err) server := transptest.NewMockServerTLS(t, timeout, certName, nil) // create lumberjack output client @@ -79,7 +80,8 @@ func TestLogstashInvalidTLSInsecure(t *testing.T) { ip := "1.2.3.4" timeout := 2 * time.Second - transptest.GenCertForTestingPurpose(t, certName, "", ip) + err := transptest.GenCertForTestingPurpose(t, certName, "", ip) + require.NoError(t, err) server := transptest.NewMockServerTLS(t, timeout, certName, nil) config := map[string]interface{}{ @@ -145,7 +147,8 @@ func testConnectionType( events := batch.Events assert.Equal(t, 1, len(events)) - msg := events[0].(map[string]interface{}) + msg, ok := events[0].(map[string]interface{}) + assert.True(t, ok) assert.Equal(t, 10.0, msg["extra"]) assert.Equal(t, "message", msg["message"]) } @@ -182,15 +185,16 @@ func newTestLumberjackOutput( } } + logger := logptest.NewTestingLogger(t, "") cfg, _ := conf.NewConfigFrom(config) - grp, err := outputs.Load(nil, beat.Info{}, nil, "logstash", cfg) + grp, err := outputs.Load(nil, beat.Info{Logger: logger}, nil, "logstash", cfg) if err != nil { t.Fatalf("init logstash output plugin failed: %v", err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := grp.Clients[0].(outputs.NetworkClient) + client := grp.Clients[0].(outputs.NetworkClient) //nolint:errcheck //safe to ignore if err := client.Connect(ctx); err != nil { t.Fatalf("Client failed to connected: %v", err) } diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index 9f74fbf4967..1a0456648cf 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -45,7 +45,7 @@ func newSyncClient( observer outputs.Observer, config *Config, ) (*syncClient, error) { - log := logp.NewLogger("logstash") + log := beat.Logger.Named("logstash") c := &syncClient{ log: log, Client: conn, @@ -76,7 +76,7 @@ func newSyncClient( func (c *syncClient) Connect(ctx context.Context) error { c.log.Debug("connect") - err := c.Client.ConnectContext(ctx) + err := c.ConnectContext(ctx) if err != nil { return err } diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go index 0d8a3e0f513..f7fa67e966c 100644 --- a/libbeat/outputs/logstash/sync_test.go +++ b/libbeat/outputs/logstash/sync_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/transport/transptest" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outest" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport" ) @@ -67,7 +68,11 @@ func makeTestClient(conn *transport.Client) testClientDriver { config := defaultConfig() config.Timeout = 1 * time.Second config.TTL = 5 * time.Second - client, err := newSyncClient(beat.Info{}, conn, outputs.NewNilObserver(), &config) + logger, err := logp.NewDevelopmentLogger("") + if err != nil { + panic(err) + } + client, err := newSyncClient(beat.Info{Logger: logger}, conn, outputs.NewNilObserver(), &config) if err != nil { panic(err) } diff --git a/libbeat/outputs/logstash/window_test.go b/libbeat/outputs/logstash/window_test.go index ff799774334..0865e096025 100644 --- a/libbeat/outputs/logstash/window_test.go +++ b/libbeat/outputs/logstash/window_test.go @@ -26,7 +26,6 @@ import ( ) func TestShrinkWindowSizeNeverZero(t *testing.T) { - enableLogging([]string{"logstash"}) windowSize := 124 var w window @@ -68,7 +67,6 @@ func TestGrowWindowSizeToMaxOKOnly(t *testing.T) { func testGrowWindowSize(t *testing.T, initial, maxOK, windowSize, batchSize, expected int, ) { - enableLogging([]string{"logstash"}) var w window w.init(initial, windowSize) w.maxOkWindowSize = maxOK diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index db3ec5a3b43..b5bbbeb1b2b 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -75,9 +75,10 @@ func newClient( pass string, db int, key outil.Selector, dt redisDataType, index string, codec codec.Codec, + logger *logp.Logger, ) *client { return &client{ - log: logp.NewLogger("redis"), + log: logger.Named("redis"), Client: tc, observer: observer, timeout: timeout, diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index d0cba1e7061..36ba223b334 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -87,7 +87,7 @@ func makeRedis( case "channel": dataType = redisChannelType default: - return outputs.Fail(errors.New("Bad Redis data type")) + return outputs.Fail(errors.New("Bad Redis data type")) //nolint:staticcheck //Keep old behavior } key, err := buildKeySelector(cfg) @@ -161,7 +161,7 @@ func makeRedis( } client := newClient(conn, observer, rConfig.Timeout, - pass, rConfig.Db, key, dataType, rConfig.Index, enc) + pass, rConfig.Db, key, dataType, rConfig.Index, enc, beat.Logger) clients[i] = newBackoffClient(client, rConfig.Backoff.Init, rConfig.Backoff.Max) } diff --git a/libbeat/outputs/redis/redis_integration_test.go b/libbeat/outputs/redis/redis_integration_test.go index 6fd3e09397a..da1ff675fa0 100644 --- a/libbeat/outputs/redis/redis_integration_test.go +++ b/libbeat/outputs/redis/redis_integration_test.go @@ -30,6 +30,7 @@ import ( "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" @@ -37,6 +38,7 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/codec/json" "github.com/elastic/beats/v7/libbeat/outputs/outest" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -133,9 +135,17 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) { total := batches & batchSize db := 0 - key := cfg["key"].(string) + key, ok := cfg["key"].(string) + if !ok { + t.Fatalf("expected string for key, but got %T", cfg["key"]) + } + if v, ok := cfg["db"]; ok { - db = v.(int) + if dbValue, ok := v.(int); ok { + db = dbValue + } else { + t.Fatalf("expected int for db, but got %T", v) + } } conn, err := redis.Dial("tcp", getRedisAddr(), redis.DialDatabase(db)) @@ -145,7 +155,8 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) { // delete old key if present defer conn.Close() - conn.Do("DEL", key) + _, err = conn.Do("DEL", key) + require.NoError(t, err) out := newRedisTestingOutput(t, cfg) err = sendTestEvents(out, batches, batchSize) @@ -225,9 +236,17 @@ func testPublishChannel(t *testing.T, cfg map[string]interface{}) { total := batches & batchSize db := 0 - key := cfg["key"].(string) + key, ok := cfg["key"].(string) + if !ok { + t.Fatalf("expected string for key, but got %T", cfg["key"]) + } + if v, ok := cfg["db"]; ok { - db = v.(int) + if dbValue, ok := v.(int); ok { + db = dbValue + } else { + t.Fatalf("expected int for db, but got %T", v) + } } conn, err := redis.Dial("tcp", getRedisAddr(), redis.DialDatabase(db)) @@ -237,14 +256,15 @@ func testPublishChannel(t *testing.T, cfg map[string]interface{}) { // delete old key if present defer conn.Close() - conn.Do("DEL", key) + _, err = conn.Do("DEL", key) + require.NoError(t, err) // subscribe to packetbeat channel psc := redis.PubSubConn{Conn: conn} if err := psc.Subscribe(key); err != nil { t.Fatal(err) } - defer psc.Unsubscribe(key) + defer psc.Unsubscribe(key) //nolint:errcheck //This is a test file // connect and publish events var wg sync.WaitGroup @@ -330,12 +350,16 @@ func newRedisTestingOutput(t *testing.T, cfg map[string]interface{}) outputs.Cli t.Fatalf("redis output module not registered") } - out, err := plugin(nil, beat.Info{Beat: testBeatname, Version: testBeatversion}, outputs.NewNilObserver(), config) + logger := logptest.NewTestingLogger(t, "") + out, err := plugin(nil, beat.Info{Beat: testBeatname, Version: testBeatversion, Logger: logger}, outputs.NewNilObserver(), config) if err != nil { t.Fatalf("Failed to initialize redis output: %v", err) } - client := out.Clients[0].(outputs.NetworkClient) + client, ok := out.Clients[0].(outputs.NetworkClient) + if !ok { + t.Fatalf("expected outputs.NetworkClient, but got %T", out.Clients[0]) + } ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) if err := client.Connect(ctx); err != nil { diff --git a/libbeat/outputs/redis/redis_test.go b/libbeat/outputs/redis/redis_test.go index 6e9d70f5786..33cd266c403 100644 --- a/libbeat/outputs/redis/redis_test.go +++ b/libbeat/outputs/redis/redis_test.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs" _ "github.com/elastic/beats/v7/libbeat/outputs/codec/json" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -47,7 +48,7 @@ func clientsLen(required int) checker { func clientPassword(index int, pass string) checker { return func(t *testing.T, group outputs.Group) { - redisClient := group.Clients[index].(*backoffClient) + redisClient := group.Clients[index].(*backoffClient) //nolint:errcheck //This is a test file, can ignore assert.Equal(t, redisClient.client.password, pass) } } @@ -106,6 +107,8 @@ func TestMakeRedis(t *testing.T) { beatInfo := beat.Info{Beat: "libbeat", Version: "1.2.3"} for name, test := range tests { t.Run(name, func(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") + beatInfo.Logger = logger cfg, err := config.NewConfigFrom(test.config) assert.NoError(t, err) groups, err := makeRedis(nil, beatInfo, outputs.NewNilObserver(), cfg) diff --git a/libbeat/outputs/util_test.go b/libbeat/outputs/util_test.go index 3835937d822..64bc8a4f54a 100644 --- a/libbeat/outputs/util_test.go +++ b/libbeat/outputs/util_test.go @@ -92,8 +92,7 @@ func TestDiskQueueUnderAgent(t *testing.T) { require.NotNil(t, actualGroup) require.NotNil(t, actualGroup.QueueFactory) - testlogger, _ := logp.NewInMemory("test-diskqueue", zapcore.EncoderConfig{}) - + testlogger, _ := logp.NewInMemoryLocal("test-diskqueue", zapcore.EncoderConfig{}) actualQueue, err := actualGroup.QueueFactory(testlogger, nil, 1, nil) require.NoError(t, err) require.NotNil(t, actualQueue) diff --git a/libbeat/template/load.go b/libbeat/template/load.go index c2c1375664d..be37fdfa54d 100644 --- a/libbeat/template/load.go +++ b/libbeat/template/load.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "io/ioutil" "net/http" "os" @@ -77,23 +76,23 @@ type templateBuilder struct { } // NewESLoader creates a new template loader for ES -func NewESLoader(client ESClient, lifecycleClient lifecycle.ClientHandler) (*ESLoader, error) { +func NewESLoader(client ESClient, lifecycleClient lifecycle.ClientHandler, logger *logp.Logger) (*ESLoader, error) { if client == nil { return nil, errors.New("can not load template without active Elasticsearch client") } return &ESLoader{client: client, lifecycleClient: lifecycleClient, - builder: newTemplateBuilder(client.IsServerless()), log: logp.NewLogger("template_loader")}, nil + builder: newTemplateBuilder(client.IsServerless(), logger), log: logger.Named("template_loader")}, nil } // NewFileLoader creates a new template loader for the given file. -func NewFileLoader(c FileClient, isServerless bool) *FileLoader { +func NewFileLoader(c FileClient, isServerless bool, logger *logp.Logger) *FileLoader { // other components of the file loader will fail if both ILM and DSL are set, // so at this point it's fairly safe to just pass cfg.DSL.Enabled - return &FileLoader{client: c, builder: newTemplateBuilder(isServerless), log: logp.NewLogger("file_template_loader")} + return &FileLoader{client: c, builder: newTemplateBuilder(isServerless, logger), log: logger.Named("file_template_loader")} } -func newTemplateBuilder(serverlessMode bool) *templateBuilder { - return &templateBuilder{log: logp.NewLogger("template"), isServerless: serverlessMode} +func newTemplateBuilder(serverlessMode bool, logger *logp.Logger) *templateBuilder { + return &templateBuilder{log: logger.Named("template"), isServerless: serverlessMode} } // Load checks if the index mapping template should be loaded. @@ -282,7 +281,7 @@ func (b *templateBuilder) buildBodyFromJSON(config TemplateConfig) (mapstr.M, er return nil, fmt.Errorf("error checking json file %s for template: %w", jsonPath, err) } b.log.Debugf("Loading json template from file %s", jsonPath) - content, err := ioutil.ReadFile(jsonPath) + content, err := os.ReadFile(jsonPath) if err != nil { return nil, fmt.Errorf("error reading file %s for template: %w", jsonPath, err) diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go index 85fb777e935..6ebebed6f8f 100644 --- a/libbeat/template/load_integration_test.go +++ b/libbeat/template/load_integration_test.go @@ -41,6 +41,7 @@ import ( "github.com/elastic/beats/v7/libbeat/esleg/eslegtest" "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/beats/v7/libbeat/version" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) @@ -69,7 +70,8 @@ func newTestSetup(t *testing.T, cfg TemplateConfig) *testSetup { t.Fatal(err) } handler := &mockClientHandler{serverless: false, mode: lifecycle.ILM} - loader, err := NewESLoader(client, handler) + logger := logptest.NewTestingLogger(t, "") + loader, err := NewESLoader(client, handler, logger) require.NoError(t, err) s := testSetup{t: t, client: client, loader: loader, config: cfg} // don't care if the cleanup fails, since they might just return a 404 @@ -86,7 +88,8 @@ func newTestSetupWithESClient(t *testing.T, client ESClient, cfg TemplateConfig) cfg.Name = fmt.Sprintf("load-test-%+v", rand.Int()) } handler := &mockClientHandler{serverless: false, mode: lifecycle.ILM} - loader, err := NewESLoader(client, handler) + logger := logptest.NewTestingLogger(t, "") + loader, err := NewESLoader(client, handler, logger) require.NoError(t, err) return &testSetup{t: t, client: client, loader: loader, config: cfg} } @@ -504,7 +507,7 @@ func getTemplate(t *testing.T, client ESClient, templateName string) testTemplat return testTemplate{ t: t, client: client, - M: mapstr.M(templateElem["index_template"].(map[string]interface{})), + M: mapstr.M(templateElem["index_template"].(map[string]interface{})), //nolint:errcheck //This is a test file } } @@ -523,14 +526,14 @@ func (tt *testTemplate) SourceEnabled() bool { tt.t.Fatalf("failed to read '%v' in %s", key, doc) } - return val.(bool) + return val.(bool) //nolint:errcheck //This is a test file } func (tt *testTemplate) NumberOfShards() int { val, err := tt.GetValue("template.settings.index.number_of_shards") require.NoError(tt.t, err) - i, err := strconv.Atoi(val.(string)) + i, err := strconv.Atoi(val.(string)) //nolint:errcheck //safe to ignore require.NoError(tt.t, err) return i } diff --git a/libbeat/template/load_test.go b/libbeat/template/load_test.go index db82384539c..8fb59486d7f 100644 --- a/libbeat/template/load_test.go +++ b/libbeat/template/load_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/version" @@ -226,7 +227,8 @@ func TestFileLoader_Load(t *testing.T) { } { t.Run(name, func(t *testing.T) { fc := newFileClient(ver) - fl := NewFileLoader(fc, test.isServerless) + logger := logptest.NewTestingLogger(t, "") + fl := NewFileLoader(fc, test.isServerless, logger) cfg := DefaultConfig(info) cfg.Settings = test.settings diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 8deab8f27e4..a3eada4c2c3 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -53,6 +53,7 @@ type Metricbeat struct { // Options moduleOptions []module.Option + logger *logp.Logger } // Option specifies some optional arguments used for configuring the behavior @@ -152,6 +153,7 @@ func newMetricbeat(b *beat.Beat, c *conf.C, registry *mb.Register, options ...Op done: make(chan struct{}), config: config, registry: registry, + logger: b.Info.Logger, } for _, applyOption := range options { applyOption(metricbeat) @@ -214,6 +216,7 @@ func newMetricbeat(b *beat.Beat, c *conf.C, registry *mb.Register, options ...Op factory, autodiscover.QueryConfig(), config.Autodiscover, b.Keystore, + b.Info.Logger, ) if err != nil { return nil, err @@ -246,7 +249,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { // Centrally managed modules factory := module.NewFactory(b.Info, bt.registry, bt.moduleOptions...) - modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher) + modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher, bt.logger) b.Registry.MustRegisterInput(modules) wg.Add(1) go func() { @@ -264,7 +267,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { // Dynamic file based modules (metricbeat.config.modules) if bt.config.ConfigModules.Enabled() { - moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules) + moduleReloader := cfgfile.NewReloader(bt.logger.Named("module.reload"), b.Publisher, bt.config.ConfigModules) if err := moduleReloader.Check(factory); err != nil { return err @@ -307,5 +310,5 @@ func (bt *Metricbeat) Stop() { // Modules return a list of all configured modules. func (bt *Metricbeat) Modules() ([]*module.Wrapper, error) { - return module.ConfiguredModules(bt.registry, bt.config.Modules, bt.config.ConfigModules, bt.moduleOptions) + return module.ConfiguredModules(bt.registry, bt.config.Modules, bt.config.ConfigModules, bt.moduleOptions, bt.logger) } diff --git a/metricbeat/cmd/modules.go b/metricbeat/cmd/modules.go index 75cd807cb8b..a5d5717f086 100644 --- a/metricbeat/cmd/modules.go +++ b/metricbeat/cmd/modules.go @@ -39,7 +39,7 @@ func BuildModulesManager(beat *beat.Beat) (cmd.ModulesManager, error) { return nil, fmt.Errorf("wrong settings for config.modules.path, it is expected to end with *.yml. Got: %s", glob) } - modulesManager, err := cfgfile.NewGlobManager(glob, ".yml", ".disabled") + modulesManager, err := cfgfile.NewGlobManager(glob, ".yml", ".disabled", beat.Info.Logger) if err != nil { return nil, fmt.Errorf("initialization error: %w", err) } diff --git a/metricbeat/mb/module/configuration.go b/metricbeat/mb/module/configuration.go index 04031e42e7c..72dca94800b 100644 --- a/metricbeat/mb/module/configuration.go +++ b/metricbeat/mb/module/configuration.go @@ -23,10 +23,11 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) // ConfiguredModules returns a list of all configured modules, including anyone present under dynamic config settings. -func ConfiguredModules(registry *mb.Register, modulesData []*conf.C, configModulesData *conf.C, moduleOptions []Option) ([]*Wrapper, error) { +func ConfiguredModules(registry *mb.Register, modulesData []*conf.C, configModulesData *conf.C, moduleOptions []Option, logger *logp.Logger) ([]*Wrapper, error) { var modules []*Wrapper //nolint:prealloc //can't be preallocated for _, moduleCfg := range modulesData { @@ -44,7 +45,7 @@ func ConfiguredModules(registry *mb.Register, modulesData []*conf.C, configModul return nil, err } - modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled") + modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled", logger) if err != nil { return nil, fmt.Errorf("initialization error: %w", err) } diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 7915127cbe2..3eeb92e41ce 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -210,7 +210,7 @@ func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { // runManaged registers a packetbeat runner with the reload.Registry and starts // the runner by starting the beat's manager. It returns on the first fatal error. func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error { - runner := newReloader(management.DebugK, factory, b.Publisher) + runner := newReloader(management.DebugK, factory, b.Publisher, b.Info.Logger) b.Registry.MustRegisterInput(runner) logp.Debug("main", "Waiting for the runner to finish") diff --git a/packetbeat/beater/reloader.go b/packetbeat/beater/reloader.go index c6925e2fa95..6a34473cad2 100644 --- a/packetbeat/beater/reloader.go +++ b/packetbeat/beater/reloader.go @@ -23,15 +23,16 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/elastic-agent-libs/logp" ) type reloader struct { *cfgfile.RunnerList } -func newReloader(name string, factory *processorFactory, pipeline beat.PipelineConnector) *reloader { +func newReloader(name string, factory *processorFactory, pipeline beat.PipelineConnector, logger *logp.Logger) *reloader { return &reloader{ - RunnerList: cfgfile.NewRunnerList(name, factory, pipeline), + RunnerList: cfgfile.NewRunnerList(name, factory, pipeline, logger), } } diff --git a/x-pack/libbeat/common/cloudfoundry/cache.go b/x-pack/libbeat/common/cloudfoundry/cache.go index 4c43467eefa..4e76ab43975 100644 --- a/x-pack/libbeat/common/cloudfoundry/cache.go +++ b/x-pack/libbeat/common/cloudfoundry/cache.go @@ -43,7 +43,7 @@ func newClientCacheWrap(client cfClient, cacheName string, ttl time.Duration, er name = name + "-" + sanitizeCacheName(cacheName) } - cache, err := persistentcache.New(name, options) + cache, err := persistentcache.New(name, options, log) if err != nil { return nil, fmt.Errorf("creating metadata cache: %w", err) } diff --git a/x-pack/libbeat/common/cloudfoundry/cache_test.go b/x-pack/libbeat/common/cloudfoundry/cache_test.go index dfc12ea5318..60764bcf7b8 100644 --- a/x-pack/libbeat/common/cloudfoundry/cache_test.go +++ b/x-pack/libbeat/common/cloudfoundry/cache_test.go @@ -16,7 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/testing/testutils" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" ) func TestClientCacheWrap(t *testing.T) { @@ -32,7 +32,8 @@ func TestClientCacheWrap(t *testing.T) { Name: "Foo", // use this field to track if from cache or from client } fakeClient := &fakeCFClient{app, 0} - cache, err := newClientCacheWrap(fakeClient, "test", ttl, ttl, logp.NewLogger("cloudfoundry")) + logger := logptest.NewTestingLogger(t, "") + cache, err := newClientCacheWrap(fakeClient, "test", ttl, ttl, logger.Named("cloudfoundry")) require.NoError(t, err) missingAppGuid := mustCreateFakeGuid() diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go index 94aec3d900f..2401ea7437e 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go @@ -46,7 +46,7 @@ func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.O observer: observer, logsConsumer: beat.LogConsumer, beatInfo: beat, - log: logp.NewLogger("otelconsumer"), + log: beat.Logger.Named("otelconsumer"), } ocConfig := defaultConfig() diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go index 41c82b41c56..8eeb76a316e 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -21,7 +21,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outest" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -37,15 +37,14 @@ func TestPublish(t *testing.T) { makeOtelConsumer := func(t *testing.T, consumeFn func(ctx context.Context, ld plog.Logs) error) *otelConsumer { t.Helper() - assert.NoError(t, logp.TestingSetup(logp.WithSelectors("otelconsumer"))) - + logger := logptest.NewTestingLogger(t, "") logConsumer, err := consumer.NewLogs(consumeFn) assert.NoError(t, err) consumer := &otelConsumer{ observer: outputs.NewNilObserver(), logsConsumer: logConsumer, beatInfo: beat.Info{}, - log: logp.NewLogger("otelconsumer"), + log: logger.Named("otelconsumer"), } return consumer } diff --git a/x-pack/libbeat/persistentcache/persistentcache.go b/x-pack/libbeat/persistentcache/persistentcache.go index f02372b0dfb..c7fddf0379e 100644 --- a/x-pack/libbeat/persistentcache/persistentcache.go +++ b/x-pack/libbeat/persistentcache/persistentcache.go @@ -46,8 +46,8 @@ type Options struct { // New creates and returns a new persistent cache. // Cache returned by this method must be closed with Close() when // not needed anymore. -func New(name string, opts Options) (*PersistentCache, error) { - logger := logp.NewLogger("persistentcache") +func New(name string, opts Options, logger *logp.Logger) (*PersistentCache, error) { + logger = logger.Named("persistentcache") rootPath := opts.RootPath if rootPath == "" { @@ -97,7 +97,7 @@ func (c *PersistentCache) Get(k string, v interface{}) error { return err } if c.refreshOnAccess && c.timeout > 0 { - c.store.Set([]byte(k), d, c.timeout) + c.store.Set([]byte(k), d, c.timeout) //nolint:errcheck //This is a test file } err = c.codec.Decode(d, v) if err != nil { diff --git a/x-pack/libbeat/persistentcache/persistentcache_test.go b/x-pack/libbeat/persistentcache/persistentcache_test.go index 7a3b61b8cb3..f64776d2428 100644 --- a/x-pack/libbeat/persistentcache/persistentcache_test.go +++ b/x-pack/libbeat/persistentcache/persistentcache_test.go @@ -6,7 +6,7 @@ package persistentcache import ( "fmt" - "math/rand" + "math/rand/v2" "path/filepath" "strconv" "testing" @@ -17,13 +17,14 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" ) func TestPutGet(t *testing.T) { - logp.TestingSetup() t.Parallel() + logger := logptest.NewTestingLogger(t, "") - cache, err := New("test", testOptions(t)) + cache, err := New("test", testOptions(t), logger) require.NoError(t, err) defer cache.Close() @@ -47,12 +48,12 @@ func TestPutGet(t *testing.T) { } func TestPersist(t *testing.T) { - logp.TestingSetup() + logger := logptest.NewTestingLogger(t, "") t.Parallel() options := testOptions(t) - cache, err := New("test", options) + cache, err := New("test", options, logger) require.NoError(t, err) type valueType struct { @@ -67,8 +68,7 @@ func TestPersist(t *testing.T) { err = cache.Close() assert.NoError(t, err) - - cache, err = New("test", options) + cache, err = New("test", options, logger) require.NoError(t, err) defer cache.Close() @@ -82,11 +82,12 @@ func TestExpired(t *testing.T) { if testing.Short() { t.Skip("skipping in short mode") } - logp.TestingSetup() t.Parallel() options := testOptions(t) - cache, err := New("test", options) + + logger := logptest.NewTestingLogger(t, "") + cache, err := New("test", options, logger) require.NoError(t, err) defer cache.Close() @@ -118,7 +119,6 @@ func TestRefreshOnAccess(t *testing.T) { t.Skip("skipping in short mode") } - logp.TestingSetup() t.Parallel() // Badger TTL is not reliable on sub-second durations. @@ -126,7 +126,8 @@ func TestRefreshOnAccess(t *testing.T) { options.Timeout = 2 * time.Second options.RefreshOnAccess = true - cache, err := New("test", options) + logger := logptest.NewTestingLogger(t, "") + cache, err := New("test", options, logger) require.NoError(t, err) defer cache.Close() @@ -167,10 +168,12 @@ func BenchmarkPut(b *testing.B) { Put(key string, value interface{}) error Close() error } + logger, err := logp.NewDevelopmentLogger("") + require.NoError(b, err) options := testOptions(b) newPersistentCache := func(tb testing.TB, name string) cache { - cache, err := New(name, options) + cache, err := New(name, options, logger) require.NoError(tb, err) return cache } @@ -292,9 +295,11 @@ func BenchmarkOpen(b *testing.B) { Close() error } + logger, err := logp.NewDevelopmentLogger("") + require.NoError(b, err) options := testOptions(b) newPersistentCache := func(tb testing.TB, name string) cache { - cache, err := New(name, options) + cache, err := New(name, options, logger) require.NoError(tb, err) return cache } @@ -345,9 +350,11 @@ func BenchmarkGet(b *testing.B) { Close() error } + logger, err := logp.NewDevelopmentLogger("") + require.NoError(b, err) options := testOptions(b) newPersistentCache := func(tb testing.TB, name string) cache { - cache, err := New(name, options) + cache, err := New(name, options, logger) require.NoError(tb, err) return cache } @@ -390,7 +397,7 @@ func BenchmarkGet(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - expected := objects[rand.Intn(size)] + expected := objects[rand.IntN(size)] //nolint:errcheck // benchmarks cache.Get(expected.ID, &result) if expected.ID != result.ID { diff --git a/x-pack/libbeat/persistentcache/store_test.go b/x-pack/libbeat/persistentcache/store_test.go index 9ff31d2a179..7d738945cbb 100644 --- a/x-pack/libbeat/persistentcache/store_test.go +++ b/x-pack/libbeat/persistentcache/store_test.go @@ -5,29 +5,22 @@ package persistentcache import ( - "io/ioutil" - "os" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" ) func TestStandaloneStore(t *testing.T) { - type valueType struct { - Something string - } var key = []byte("somekey") var value = []byte("somevalue") - tempDir, err := ioutil.TempDir("", "beat-data-dir-") - require.NoError(t, err) - t.Cleanup(func() { os.RemoveAll(tempDir) }) + tempDir := t.TempDir() - store, err := newStore(logp.NewLogger("test"), tempDir, "store-cache") + store, err := newStore(logptest.NewTestingLogger(t, ""), tempDir, "store-cache") require.NoError(t, err) err = store.Set(key, value, 0)