Skip to content
13 changes: 7 additions & 6 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ func newCrawler(
inputConfigs []*conf.C,
beatDone chan struct{},
once bool,
logger *logp.Logger,
) (*crawler, error) {
return &crawler{
log: logp.NewLogger("crawler"),
log: logger.Named("crawler"),
inputs: map[uint64]cfgfile.Runner{},
inputsFactory: inputFactory,
modulesFactory: module,
Expand Down Expand Up @@ -79,14 +80,14 @@ func (c *crawler) Start(
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
c.inputReloader = cfgfile.NewReloader(log, pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return fmt.Errorf("creating input reloader failed: %w", err)
}
}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
c.modulesReloader = cfgfile.NewReloader(log, pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return fmt.Errorf("creating module reloader failed: %w", err)
}
Expand Down Expand Up @@ -152,7 +153,7 @@ func (c *crawler) startInput(
}

func (c *crawler) Stop() {
logp.Info("Stopping Crawler")
c.log.Info("Stopping Crawler")

asyncWaitStop := func(stop func()) {
c.wg.Add(1)
Expand All @@ -162,7 +163,7 @@ func (c *crawler) Stop() {
}()
}

logp.Info("Stopping %d inputs", len(c.inputs))
c.log.Infof("Stopping %d inputs", len(c.inputs))
// Stop inputs in parallel
for id, p := range c.inputs {
id, p := id, p
Expand All @@ -182,7 +183,7 @@ func (c *crawler) Stop() {

c.WaitForCompletion()

logp.Info("Crawler stopped")
c.log.Info("Crawler stopped")
}

func (c *crawler) WaitForCompletion() {
Expand Down
98 changes: 33 additions & 65 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ import (
"strings"
"sync"

"github.com/elastic/beats/v7/filebeat/backup"
"github.com/elastic/beats/v7/filebeat/channel"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/fileset"
_ "github.com/elastic/beats/v7/filebeat/include"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/filestream/takeover"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/filebeat/input/v2/compat"
"github.com/elastic/beats/v7/filebeat/registrar"
Expand All @@ -49,7 +47,6 @@ import (
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/paths"
"github.com/elastic/go-concert/unison"

// Add filebeat level processors
Expand All @@ -75,6 +72,7 @@ type Filebeat struct {
done chan struct{}
stopOnce sync.Once // wraps the Stop() method
pipeline beat.PipelineConnector
logger *logp.Logger
}

type PluginFactory func(beat.Info, *logp.Logger, statestore.States) []v2.Plugin
Expand All @@ -89,7 +87,7 @@ func New(plugins PluginFactory) beat.Creator {
func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Beater, error) {
config := cfg.DefaultConfig
if err := rawConfig.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %w", err)
return nil, fmt.Errorf("Error reading config file: %w", err) //nolint:staticcheck //Keep old behavior
}

if err := cfgwarn.CheckRemoved6xSettings(
Expand Down Expand Up @@ -124,17 +122,17 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
}

if b.API != nil {
if err = inputmon.AttachHandler(b.API.Router(), b.Info.Monitoring.NamespaceRegistry()); err != nil {
if err = inputmon.AttachHandler(b.API.Router(), nil); err != nil {
return nil, fmt.Errorf("failed attach inputs api to monitoring endpoint server: %w", err)
}
}

if b.Manager != nil {
b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.",
"input_metrics.json", "application/json", func() []byte {
data, err := inputmon.MetricSnapshotJSON(b.Info.Monitoring.NamespaceRegistry())
data, err := inputmon.MetricSnapshotJSON(nil)
if err != nil {
logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err)
b.Info.Logger.Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err)
return []byte(err.Error())
}
return data
Expand Down Expand Up @@ -163,11 +161,11 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
}

// in the `setup` command, log this only as a warning
logp.Warn("Setup called, but no modules enabled.")
b.Info.Logger.Warn("Setup called, but no modules enabled.")
}

if *once && config.ConfigInput.Enabled() && config.ConfigModules.Enabled() {
return nil, fmt.Errorf("input configs and -once cannot be used together")
return nil, fmt.Errorf("input configs and --once cannot be used together")
}

if config.IsInputEnabled("stdin") && len(enabledInputs) > 1 {
Expand All @@ -179,6 +177,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
config: &config,
moduleRegistry: moduleRegistry,
pluginFactory: plugins,
logger: b.Info.Logger,
}

err = fb.setupPipelineLoaderCallback(b)
Expand All @@ -192,7 +191,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
// setupPipelineLoaderCallback sets the callback function for loading pipelines during setup.
func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
if b.Config.Output.Name() != "elasticsearch" && !b.Manager.Enabled() {
logp.Warn(pipelinesWarning)
fb.logger.Warn(pipelinesWarning)
return nil
}

Expand Down Expand Up @@ -225,7 +224,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
newPath := strings.TrimSuffix(origPath, ".yml")
_ = fb.config.ConfigModules.SetString("path", -1, newPath)
}
modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules)
modulesLoader := cfgfile.NewReloader(fb.logger.Named("module.reloader"), fb.pipeline, fb.config.ConfigModules)
modulesLoader.Load(modulesFactory)
}

Expand All @@ -238,7 +237,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
// setup.
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
if b.Config.Output.Name() != "elasticsearch" {
logp.Warn(pipelinesWarning)
fb.logger.Warn(pipelinesWarning)
return nil
}

Expand Down Expand Up @@ -290,7 +289,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

registryMigrator := registrar.NewMigrator(config.Registry)
if err := registryMigrator.Run(); err != nil {
logp.Err("Failed to migrate registry file: %+v", err)
fb.logger.Errorf("Failed to migrate registry file: %+v", err)
return err
}

Expand All @@ -301,9 +300,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
cn()
}()

stateStore, err := openStateStore(ctx, b.Info, logp.NewLogger("filebeat"), config.Registry)
stateStore, err := openStateStore(ctx, b.Info, fb.logger.Named("filebeat"), config.Registry)
if err != nil {
logp.Err("Failed to open state store: %+v", err)
fb.logger.Errorf("Failed to open state store: %+v", err)
return err
}
defer stateStore.Close()
Expand All @@ -315,7 +314,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
outCfg := conf.Namespace{}
if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
logp.Err("Failed to unpack the output config: %v", err)
fb.logger.Errorf("Failed to unpack the output config: %v", err)
return nil
}

Expand All @@ -324,24 +323,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// See https://github.com/elastic/beats/issues/42815
configCopy, err := conf.NewConfigFrom(outCfg.Config())
if err != nil {
logp.Err("Failed to create a new config from the output config: %v", err)
fb.logger.Errorf("Failed to create a new config from the output config: %v", err)
return nil
}
stateStore.notifier.Notify(configCopy)
return nil
})
}

err = processLogInputTakeOver(stateStore, config)
if err != nil {
logp.Err("Failed to attempt filestream state take over: %+v", err)
return err
}

// Setup registrar to persist state
registrar, err := registrar.New(stateStore, finishedLogger, config.Registry.FlushTimeout)
if err != nil {
logp.Err("Could not init registrar: %v", err)
fb.logger.Errorf("Could not init registrar: %v", err)
return err
}

Expand All @@ -366,7 +359,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
outDone := make(chan struct{}) // outDone closes down all active pipeline connections
pipelineConnector := channel.NewOutletFactory(outDone).Create

inputsLogger := logp.NewLogger("input")
inputsLogger := fb.logger.Named("input")
v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore)
v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType)
if err != nil {
Expand All @@ -380,7 +373,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

// Store needs to be fully configured at this point
if err := v2InputLoader.Init(&inputTaskGroup); err != nil {
logp.Err("Failed to initialize the input managers: %v", err)
fb.logger.Errorf("Failed to initialize the input managers: %v", err)
return err
}

Expand All @@ -401,12 +394,12 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
if b.Config.Output.Name() == "elasticsearch" {
pipelineLoaderFactory = newPipelineLoaderFactory(pipelineFactoryCtx, b.Config.Output.Config())
} else {
logp.Warn(pipelinesWarning)
fb.logger.Warn(pipelinesWarning)
}
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)
crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once)
crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once, fb.logger)
if err != nil {
logp.Err("Could not init crawler: %v", err)
fb.logger.Errorf("Could not init crawler: %v", err)
return err
}

Expand All @@ -417,7 +410,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// Start the registrar
err = registrar.Start()
if err != nil {
return fmt.Errorf("Could not start registrar: %w", err)
return fmt.Errorf("Could not start registrar: %w", err) //nolint:staticcheck //Keep old behavior
}

// Stopping registrar will write last state
Expand All @@ -435,31 +428,31 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
defer waitEvents.Wait()

if config.OverwritePipelines {
logp.Debug("modules", "Existing Ingest pipelines will be updated")
fb.logger.Debug("modules", "Existing Ingest pipelines will be updated")
}

err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules)
if err != nil {
crawler.Stop()
cancelPipelineFactoryCtx()
return fmt.Errorf("Failed to start crawler: %w", err)
return fmt.Errorf("Failed to start crawler: %w", err) //nolint:staticcheck //Keep old behavior
}

// If run once, add crawler completion check as alternative to done signal
if *once {
runOnce := func() {
logp.Info("Running filebeat once. Waiting for completion ...")
fb.logger.Info("Running filebeat once. Waiting for completion ...")
crawler.WaitForCompletion()
logp.Info("All data collection completed. Shutting down.")
fb.logger.Info("All data collection completed. Shutting down.")
}
waitFinished.Add(runOnce)
}

// Register reloadable list of inputs and modules
inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline)
inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline, fb.logger)
b.Registry.MustRegisterInput(inputs)

modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline)
modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline, fb.logger)

var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
Expand All @@ -473,6 +466,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
autodiscover.QueryConfig(),
config.Autodiscover,
b.Keystore,
fb.logger,
)
if err != nil {
return err
Expand Down Expand Up @@ -508,7 +502,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
"Continue shutdown: All enqueued events being published."))
// Wait for either timeout or all events having been ACKed by outputs.
if fb.config.ShutdownTimeout > 0 {
logp.Info("Shutdown output timer started. Waiting for max %v.", timeout)
fb.logger.Info("Shutdown output timer started. Waiting for max %v.", timeout)
waitEvents.Add(withLog(waitDuration(timeout),
"Continue shutdown: Time out waiting for events being published."))
} else {
Expand All @@ -534,7 +528,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

// Stop is called on exit to stop the crawling, spooling and registration processes.
func (fb *Filebeat) Stop() {
logp.Info("Stopping filebeat")
fb.logger.Info("Stopping filebeat")

// Stop Filebeat
fb.stopOnce.Do(func() { close(fb.done) })
Expand All @@ -545,39 +539,13 @@ func newPipelineLoaderFactory(ctx context.Context, esConfig *conf.C) fileset.Pip
pipelineLoaderFactory := func() (fileset.PipelineLoader, error) {
esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Filebeat")
if err != nil {
return nil, fmt.Errorf("Error creating Elasticsearch client: %w", err)
return nil, fmt.Errorf("Error creating Elasticsearch client: %w", err) //nolint:staticcheck //Keep old behavior
}
return esClient, nil
}
return pipelineLoaderFactory
}

// some of the filestreams might want to take over the loginput state
// if their `take_over` flag is set to `true`.
func processLogInputTakeOver(stateStore statestore.States, config *cfg.Config) error {
inputs, err := fetchInputConfiguration(config)
if err != nil {
return fmt.Errorf("Failed to fetch input configuration when attempting take over: %w", err)
}
if len(inputs) == 0 {
return nil
}

store, err := stateStore.StoreFor("")
if err != nil {
return fmt.Errorf("Failed to access state when attempting take over: %w", err)
}
defer store.Close()
logger := logp.NewLogger("filestream-takeover")

registryHome := paths.Resolve(paths.Data, config.Registry.Path)
registryHome = filepath.Join(registryHome, "filebeat")

backuper := backup.NewRegistryBackuper(logger, registryHome)

return takeover.TakeOverLogInputStates(logger, store, backuper, inputs)
}

// fetches all the defined input configuration available at Filebeat startup including external files.
func fetchInputConfiguration(config *cfg.Config) (inputs []*conf.C, err error) {
if len(config.Inputs) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/cmd/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func buildModulesManager(beat *beat.Beat) (cmd.ModulesManager, error) {
return nil, fmt.Errorf("wrong settings for config.modules.path, it is expected to end with *.yml. Got: %s", glob)
}

modulesManager, err := cfgfile.NewGlobManager(glob, ".yml", ".disabled")
modulesManager, err := cfgfile.NewGlobManager(glob, ".yml", ".disabled", beat.Info.Logger)
if err != nil {
return nil, fmt.Errorf("initialization error: %w", err)
}
Expand Down
Loading
Loading