Skip to content
80 changes: 0 additions & 80 deletions filebeat/autodiscover/autodiscover.go

This file was deleted.

23 changes: 0 additions & 23 deletions filebeat/autodiscover/include.go

This file was deleted.

17 changes: 11 additions & 6 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,32 @@ func (c *crawler) Start(
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig, r.GetStates())
if err != nil {
return err
return fmt.Errorf("starting input failed: %+v", err)
}
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return err
return fmt.Errorf("creating input reloader failed: %+v", err)
}

go func() {
c.inputReloader.Run(c.inputsFactory)
}()
}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return err
return fmt.Errorf("creating module reloader failed: %+v", err)
}

}

if c.inputReloader != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will c.inputReloader ever be non-nil when crawler.Start is called, or is it always initialized from this function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be nil if input reloading has not been enabled. I just moved the check down here, so we do not leak the go-routine in case of startup going wrong when the module loader is faulty.

go func() {
c.inputReloader.Run(c.inputsFactory)
}()
}
if c.modulesReloader != nil {
go func() {
c.modulesReloader.Run(c.modulesFactory)
}()
Expand Down
20 changes: 16 additions & 4 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pkg/errors"

fbautodiscover "github.com/elastic/beats/v7/filebeat/autodiscover"
"github.com/elastic/beats/v7/filebeat/channel"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/fileset"
Expand All @@ -43,9 +42,14 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"

_ "github.com/elastic/beats/v7/filebeat/include"

// Add filebeat level processors
_ "github.com/elastic/beats/v7/filebeat/processor/add_kubernetes_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields"

// include all filebeat specific builders
_ "github.com/elastic/beats/v7/filebeat/autodiscover/builder/hints"
)

const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines for the configured" +
Expand Down Expand Up @@ -282,7 +286,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
err = crawler.Start(b.Publisher, registrar, config.ConfigInput, config.ConfigModules)
if err != nil {
crawler.Stop()
return err
return fmt.Errorf("Failed to start crawler: %+v", err)
}

// If run once, add crawler completion check as alternative to done signal
Expand All @@ -304,8 +308,16 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
adapter := fbautodiscover.NewAutodiscoverAdapter(inputLoader, moduleLoader)
adiscover, err = autodiscover.NewAutodiscover("filebeat", b.Publisher, adapter, config.Autodiscover)
adiscover, err = autodiscover.NewAutodiscover(
"filebeat",
b.Publisher,
cfgfile.MultiplexedRunnerFactory(
cfgfile.MatchHasField("module", moduleLoader),
cfgfile.MatchDefault(inputLoader),
),
autodiscover.QueryConfig(),
config.Autodiscover,
)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/channel/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ConnectorFunc func(*common.Config, beat.ClientConfig) (Outleter, error)

type pipelineConnector struct {
parent *OutletFactory
pipeline beat.Pipeline
pipeline beat.PipelineConnector
}

// Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function.
Expand Down
2 changes: 1 addition & 1 deletion filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewOutletFactory(
// Inputs and all harvesters use the same pipeline client instance.
// This guarantees ordering between events as required by the registrar for
// file.State updates
func (f *OutletFactory) Create(p beat.Pipeline) Connector {
func (f *OutletFactory) Create(p beat.PipelineConnector) Connector {
return &pipelineConnector{parent: f, pipeline: p}
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/channel/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

// Factory is used to create a new Outlet instance
type Factory func(beat.Pipeline) Connector
type Factory func(beat.PipelineConnector) Connector

// Connector creates an Outlet connecting the event publishing with some internal pipeline.
// type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)
Expand Down
9 changes: 8 additions & 1 deletion filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline"

"github.com/mitchellh/hashstructure"
)
Expand Down Expand Up @@ -75,7 +76,7 @@ func NewFactory(
}

// Create creates a module based on a config
func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
// Start a registry of one module:
m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false)
if err != nil {
Expand Down Expand Up @@ -114,6 +115,11 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP
}, nil
}

func (f *Factory) CheckConfig(c *common.Config) error {
_, err := f.Create(pubpipeline.NewNilPipeline(), c, nil)
return err
}

func (p *inputsRunner) Start() {
// Load pipelines
if p.pipelineLoaderFactory != nil {
Expand Down Expand Up @@ -153,6 +159,7 @@ func (p *inputsRunner) Start() {
moduleList.Add(m)
}
}

func (p *inputsRunner) Stop() {
if p.pipelineCallbackID != uuid.Nil {
elasticsearch.DeregisterConnectCallback(p.pipelineCallbackID)
Expand Down
8 changes: 7 additions & 1 deletion filebeat/fileset/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline"
)

// SetupFactory is for loading module assets when running setup subcommand.
Expand All @@ -41,7 +42,7 @@ func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFac
}

// Create creates a new SetupCfgRunner to setup module configuration.
func (sf *SetupFactory) Create(_ beat.Pipeline, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) {
func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) {
m, err := NewModuleRegistry([]*common.Config{c}, sf.beatInfo, false)
if err != nil {
return nil, err
Expand All @@ -54,6 +55,11 @@ func (sf *SetupFactory) Create(_ beat.Pipeline, c *common.Config, _ *common.MapS
}, nil
}

func (sf *SetupFactory) CheckConfig(c *common.Config) error {
_, err := sf.Create(pubpipeline.NewNilPipeline(), c, nil)
return err
}

// SetupCfgRunner is for loading assets of modules.
type SetupCfgRunner struct {
moduleRegistry *ModuleRegistry
Expand Down
8 changes: 7 additions & 1 deletion filebeat/input/runnerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
)

// RunnerFactory is a factory for registrars
Expand All @@ -43,7 +44,7 @@ func NewRunnerFactory(outlet channel.Factory, registrar *registrar.Registrar, be

// Create creates a input based on a config
func (r *RunnerFactory) Create(
pipeline beat.Pipeline,
pipeline beat.PipelineConnector,
c *common.Config,
meta *common.MapStrPointer,
) (cfgfile.Runner, error) {
Expand All @@ -56,3 +57,8 @@ func (r *RunnerFactory) Create(

return p, nil
}

func (r *RunnerFactory) CheckConfig(cfg *common.Config) error {
_, err := r.Create(pipeline.NewNilPipeline(), cfg, nil)
return err
}
23 changes: 0 additions & 23 deletions heartbeat/autodiscover/include.go

This file was deleted.

9 changes: 1 addition & 8 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,7 @@ func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {

// makeAutodiscover creates an autodiscover object ready to be started.
func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) {
adapter := autodiscover.NewFactoryAdapter(bt.dynamicFactory)

ad, err := autodiscover.NewAutodiscover("heartbeat", b.Publisher, adapter, bt.config.Autodiscover)
if err != nil {
return nil, err
}

return ad, nil
return autodiscover.NewAutodiscover("heartbeat", b.Publisher, bt.dynamicFactory, autodiscover.QueryConfig(), bt.config.Autodiscover)
}

// Stop stops the beat.
Expand Down
4 changes: 3 additions & 1 deletion heartbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package cmd
import (
"fmt"

_ "github.com/elastic/beats/v7/heartbeat/autodiscover"
"github.com/elastic/beats/v7/heartbeat/beater"

// include all heartbeat specific autodiscovery builders
_ "github.com/elastic/beats/v7/heartbeat/autodiscover/builder/hints"

// register default heartbeat monitors
_ "github.com/elastic/beats/v7/heartbeat/monitors/defaults"
cmd "github.com/elastic/beats/v7/libbeat/cmd"
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory {
}

// Create makes a new Runner for a new monitor with the given Config.
func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches, meta)
return monitor, err
}
Expand Down
Loading