From 626a4dfc83c75760dfcf615f604b411c50e49665 Mon Sep 17 00:00:00 2001 From: urso Date: Sat, 29 Feb 2020 14:58:37 +0100 Subject: [PATCH 01/14] Center more functionality around RunnerFactory Remove CheckableRunnerFactory and require RunnerFactory to implement CheckConfig. CheckableRunnerFactory more and more superseded RunnerFactory. As we want more config validation support in the future as well I combined the two into RunnerFactory Remove autodiscover.Adapter. The adapter did inherit from CheckableRunnerFactory, giving us some inheritance chain between RunnerFactory, CheckableRunnerFactory, and Adapter for autodiscovery. By removig the Adapter and CheckableRunnerFactory we have one common type (RunnerFactory) to integrate with config file reloading, static input/module setup, and autodiscovery. Add selectors for autodiscovery event selection that are used as additional parameters when creating a new Autodiscover instance. This gives us some more composability, yet I wonder if we can even remove those, as every instance of NewAutodiscover did look for events with a 'config' field of type []*common.Config. --- filebeat/autodiscover/autodiscover.go | 80 ----------------------- filebeat/autodiscover/include.go | 23 ------- filebeat/beater/filebeat.go | 19 +++++- filebeat/fileset/factory.go | 5 ++ filebeat/fileset/setup.go | 4 ++ filebeat/input/runnerfactory.go | 4 ++ heartbeat/autodiscover/include.go | 23 ------- heartbeat/beater/heartbeat.go | 15 ++--- heartbeat/cmd/root.go | 4 +- libbeat/autodiscover/autodiscover.go | 42 +++++++----- libbeat/autodiscover/autodiscover_test.go | 2 +- libbeat/autodiscover/eventselect.go | 68 +++++++++++++++++++ libbeat/autodiscover/factoryadapter.go | 63 ------------------ libbeat/cfgfile/factories.go | 63 ++++++++++++++++++ libbeat/cfgfile/reload.go | 14 ++-- metricbeat/autodiscover/include.go | 26 -------- metricbeat/beater/metricbeat.go | 17 +++-- 17 files changed, 215 insertions(+), 257 deletions(-) delete mode 100644 filebeat/autodiscover/autodiscover.go delete mode 100644 filebeat/autodiscover/include.go delete mode 100644 heartbeat/autodiscover/include.go create mode 100644 libbeat/autodiscover/eventselect.go delete mode 100644 libbeat/autodiscover/factoryadapter.go create mode 100644 libbeat/cfgfile/factories.go delete mode 100644 metricbeat/autodiscover/include.go diff --git a/filebeat/autodiscover/autodiscover.go b/filebeat/autodiscover/autodiscover.go deleted file mode 100644 index 85fa6cf65bd2..000000000000 --- a/filebeat/autodiscover/autodiscover.go +++ /dev/null @@ -1,80 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package autodiscover - -import ( - "errors" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/cfgfile" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/bus" -) - -// AutodiscoverAdapter for Filebeat modules & input -type AutodiscoverAdapter struct { - inputFactory cfgfile.RunnerFactory - moduleFactory cfgfile.RunnerFactory -} - -// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Filebeat modules & input -func NewAutodiscoverAdapter(inputFactory, moduleFactory cfgfile.RunnerFactory) *AutodiscoverAdapter { - return &AutodiscoverAdapter{ - inputFactory: inputFactory, - moduleFactory: moduleFactory, - } -} - -// CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter` -func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) { - config, ok := e["config"].([]*common.Config) - if !ok { - return nil, errors.New("Got a wrong value in event `config` key") - } - return config, nil -} - -// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work -func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error { - var factory cfgfile.RunnerFactory - - if c.HasField("module") { - factory = m.moduleFactory - } else { - factory = m.inputFactory - } - - if checker, ok := factory.(cfgfile.ConfigChecker); ok { - return checker.CheckConfig(c) - } - - return nil -} - -// Create a module or input from the given config -func (m *AutodiscoverAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { - if c.HasField("module") { - return m.moduleFactory.Create(p, c, meta) - } - return m.inputFactory.Create(p, c, meta) -} - -// EventFilter returns the bus filter to retrieve runner start/stop triggering events -func (m *AutodiscoverAdapter) EventFilter() []string { - return []string{"config"} -} diff --git a/filebeat/autodiscover/include.go b/filebeat/autodiscover/include.go deleted file mode 100644 index de15185e7166..000000000000 --- a/filebeat/autodiscover/include.go +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package autodiscover - -import ( - // include all filebeat specific builders - _ "github.com/elastic/beats/v7/filebeat/autodiscover/builder/hints" -) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index e368cd89615d..5e82c53f7a4e 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -24,7 +24,6 @@ import ( "github.com/pkg/errors" - fbautodiscover "github.com/elastic/beats/v7/filebeat/autodiscover" "github.com/elastic/beats/v7/filebeat/channel" cfg "github.com/elastic/beats/v7/filebeat/config" "github.com/elastic/beats/v7/filebeat/fileset" @@ -43,9 +42,14 @@ import ( "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + _ "github.com/elastic/beats/v7/filebeat/include" + // Add filebeat level processors _ "github.com/elastic/beats/v7/filebeat/processor/add_kubernetes_metadata" _ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields" + + // include all filebeat specific builders + _ "github.com/elastic/beats/v7/filebeat/autodiscover/builder/hints" ) const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines for the configured" + @@ -304,8 +308,17 @@ func (fb *Filebeat) Run(b *beat.Beat) error { var adiscover *autodiscover.Autodiscover if fb.config.Autodiscover != nil { - adapter := fbautodiscover.NewAutodiscoverAdapter(inputLoader, moduleLoader) - adiscover, err = autodiscover.NewAutodiscover("filebeat", b.Publisher, adapter, config.Autodiscover) + adiscover, err = autodiscover.NewAutodiscover( + "filebeat", + b.Publisher, + cfgfile.MultiplexedRunnerFactory( + cfgfile.MatchHasField("module", moduleLoader), + cfgfile.MatchDefault(inputLoader), + ), + autodiscover.ConfigEvents(), + autodiscover.QueryConfig(), + config.Autodiscover, + ) if err != nil { return err } diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 92e626795c92..70cde2079a53 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -114,6 +114,10 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP }, nil } +func (f *Factory) CheckConfig(c *common.Config) error { + return nil +} + func (p *inputsRunner) Start() { // Load pipelines if p.pipelineLoaderFactory != nil { @@ -153,6 +157,7 @@ func (p *inputsRunner) Start() { moduleList.Add(m) } } + func (p *inputsRunner) Stop() { if p.pipelineCallbackID != uuid.Nil { elasticsearch.DeregisterConnectCallback(p.pipelineCallbackID) diff --git a/filebeat/fileset/setup.go b/filebeat/fileset/setup.go index 4c26dfb47e31..af02d8148561 100644 --- a/filebeat/fileset/setup.go +++ b/filebeat/fileset/setup.go @@ -54,6 +54,10 @@ func (sf *SetupFactory) Create(_ beat.Pipeline, c *common.Config, _ *common.MapS }, nil } +func (sf *SetupFactory) CheckConfig(c *common.Config) error { + return nil +} + // SetupCfgRunner is for loading assets of modules. type SetupCfgRunner struct { moduleRegistry *ModuleRegistry diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index d41382fe01fb..89534a852697 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -56,3 +56,7 @@ func (r *RunnerFactory) Create( return p, nil } + +func (r *RunnerFactory) CheckConfig(_ *common.Config) error { + return nil +} diff --git a/heartbeat/autodiscover/include.go b/heartbeat/autodiscover/include.go deleted file mode 100644 index 2ae869729c49..000000000000 --- a/heartbeat/autodiscover/include.go +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package autodiscover - -import ( - // include all heartbeat specific builders - _ "github.com/elastic/beats/v7/heartbeat/autodiscover/builder/hints" -) diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 90eef546788f..1a09e718d133 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -156,14 +156,13 @@ func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) { // makeAutodiscover creates an autodiscover object ready to be started. func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) { - adapter := autodiscover.NewFactoryAdapter(bt.dynamicFactory) - - ad, err := autodiscover.NewAutodiscover("heartbeat", b.Publisher, adapter, bt.config.Autodiscover) - if err != nil { - return nil, err - } - - return ad, nil + return autodiscover.NewAutodiscover( + "heartbeat", b.Publisher, + bt.dynamicFactory, + autodiscover.ConfigEvents(), + autodiscover.QueryConfig(), + bt.config.Autodiscover, + ) } // Stop stops the beat. diff --git a/heartbeat/cmd/root.go b/heartbeat/cmd/root.go index 858cc2c0daf6..8e24b28976b8 100644 --- a/heartbeat/cmd/root.go +++ b/heartbeat/cmd/root.go @@ -20,9 +20,11 @@ package cmd import ( "fmt" - _ "github.com/elastic/beats/v7/heartbeat/autodiscover" "github.com/elastic/beats/v7/heartbeat/beater" + // include all heartbeat specific autodiscovery builders + _ "github.com/elastic/beats/v7/heartbeat/autodiscover/builder/hints" + // register default heartbeat monitors _ "github.com/elastic/beats/v7/heartbeat/monitors/defaults" cmd "github.com/elastic/beats/v7/libbeat/cmd" diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 6ceb46fa794f..4805b45af3fc 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -39,24 +39,25 @@ const ( // TODO autodiscover providers config reload -// Adapter must be implemented by the beat in order to provide Autodiscover -type Adapter interface { - // CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter` - CreateConfig(bus.Event) ([]*common.Config, error) - - // RunnerFactory provides runner creation by feeding valid configs - cfgfile.CheckableRunnerFactory - - // EventFilter returns the bus filter to retrieve runner start/stop triggering events +// EventSelector returns the bus filter to retrieve runner start/stop triggering events +type EventSelector interface { EventFilter() []string } +// EventConfigurer generates a valid list of configs from the given event, the +// received event will have all keys defined by `StartFilter` +type EventConfigurer interface { + CreateConfig(bus.Event) ([]*common.Config, error) +} + // Autodiscover process, it takes a beat adapter and user config and runs autodiscover process, spawning // new modules when any configured providers does a match type Autodiscover struct { bus bus.Bus defaultPipeline beat.Pipeline - adapter Adapter + factory cfgfile.RunnerFactory + selector EventSelector + configurer EventConfigurer providers []Provider configs map[string]map[uint64]*reload.ConfigWithMeta runners *cfgfile.RunnerList @@ -66,7 +67,14 @@ type Autodiscover struct { } // NewAutodiscover instantiates and returns a new Autodiscover manager -func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, config *Config) (*Autodiscover, error) { +func NewAutodiscover( + name string, + pipeline beat.Pipeline, + factory cfgfile.RunnerFactory, + selector EventSelector, + configurer EventConfigurer, + config *Config, +) (*Autodiscover, error) { logger := logp.NewLogger("autodiscover") // Init Event bus @@ -86,9 +94,11 @@ func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, confi return &Autodiscover{ bus: bus, defaultPipeline: pipeline, - adapter: adapter, + factory: factory, + selector: selector, + configurer: configurer, configs: map[string]map[uint64]*reload.ConfigWithMeta{}, - runners: cfgfile.NewRunnerList("autodiscover", adapter, pipeline), + runners: cfgfile.NewRunnerList("autodiscover", factory, pipeline), providers: providers, meta: meta.NewMap(), logger: logger, @@ -102,7 +112,7 @@ func (a *Autodiscover) Start() { } a.logger.Info("Starting autodiscover manager") - a.listener = a.bus.Subscribe(a.adapter.EventFilter()...) + a.listener = a.bus.Subscribe(a.selector.EventFilter()...) // It is important to start the worker first before starting the producer. // In hosts that have large number of workloads, it is easy to have an initial @@ -175,7 +185,7 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { a.configs[eventID] = map[uint64]*reload.ConfigWithMeta{} } - configs, err := a.adapter.CreateConfig(event) + configs, err := a.configurer.CreateConfig(event) if err != nil { a.logger.Debugf("Could not generate config from event %v: %v", event, err) return false @@ -199,7 +209,7 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { continue } - err = a.adapter.CheckConfig(config) + err = a.factory.CheckConfig(config) if err != nil { a.logger.Error(errors.Wrap(err, fmt.Sprintf("Auto discover config check failed for config '%s', won't start runner", common.DebugString(config, true)))) continue diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index c466785d4b95..a1d5c8f2286b 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -293,7 +293,7 @@ func TestAutodiscoverHash(t *testing.T) { } // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &config) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &adapter, &config) if err != nil { t.Fatal(err) } diff --git a/libbeat/autodiscover/eventselect.go b/libbeat/autodiscover/eventselect.go new file mode 100644 index 000000000000..a8a9171e2f39 --- /dev/null +++ b/libbeat/autodiscover/eventselect.go @@ -0,0 +1,68 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package autodiscover + +import ( + "errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/bus" +) + +type selectedEvents []string + +type queryConfigFrom string + +// SelectEvents creates an event selector that will filter +// for the configured fields only. +func SelectEvents(events ...string) EventSelector { + if len(events) == 0 { + events = []string{"config"} + } + return selectedEvents(events) +} + +// ConfigEvents creates an EventSelector that will match "config" events only. +func ConfigEvents() EventSelector { + return SelectEvents("config") +} + +func (s selectedEvents) EventFilter() []string { + return []string(s) +} + +// EventConfigQuery creates an EventConfigurer that tries to cast the given event +// field from from the buf event into a []*common.Config. +func EventConfigQuery(field string) EventConfigurer { + if field == "" { + field = "config" + } + return queryConfigFrom(field) +} + +// QueryConfig extract an array of *common.Config from bus.Event. +// The configurations are expected to be in the 'config' field. +func QueryConfig() EventConfigurer { return EventConfigQuery("config") } + +func (q queryConfigFrom) CreateConfig(e bus.Event) ([]*common.Config, error) { + config, ok := e[string(q)].([]*common.Config) + if !ok { + return nil, errors.New("Got a wrong value in event `config` key") + } + return config, nil +} diff --git a/libbeat/autodiscover/factoryadapter.go b/libbeat/autodiscover/factoryadapter.go deleted file mode 100644 index 64dc9f15ca74..000000000000 --- a/libbeat/autodiscover/factoryadapter.go +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package autodiscover - -import ( - "errors" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/cfgfile" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/bus" -) - -// FactoryAdapter is an adapter that works with any cfgfile.RunnerFactory. -type FactoryAdapter struct { - factory cfgfile.CheckableRunnerFactory -} - -// NewFactoryAdapter builds and returns an autodiscover adapter that works with any cfgfile.RunnerFactory. -func NewFactoryAdapter(factory cfgfile.CheckableRunnerFactory) *FactoryAdapter { - return &FactoryAdapter{ - factory: factory, - } -} - -// CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter` -func (m *FactoryAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) { - config, ok := e["config"].([]*common.Config) - if !ok { - return nil, errors.New("Got a wrong value in event `config` key") - } - return config, nil -} - -// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work -func (m *FactoryAdapter) CheckConfig(c *common.Config) error { - return m.factory.CheckConfig(c) -} - -// Create a module or prospector from the given config -func (m *FactoryAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { - return m.factory.Create(p, c, meta) -} - -// EventFilter returns the bus filter to retrieve runner start/stop triggering events -func (m *FactoryAdapter) EventFilter() []string { - return []string{"config"} -} diff --git a/libbeat/cfgfile/factories.go b/libbeat/cfgfile/factories.go new file mode 100644 index 000000000000..907de8a24784 --- /dev/null +++ b/libbeat/cfgfile/factories.go @@ -0,0 +1,63 @@ +package cfgfile + +import ( + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" +) + +type multiplexedFactory []FactoryMatcher + +type FactoryMatcher func(cfg *common.Config) RunnerFactory + +var errConfigDoesNotMatch = errors.New("config does not match accepted configurations") + +func MultiplexedRunnerFactory(matchers ...FactoryMatcher) RunnerFactory { + return multiplexedFactory(matchers) +} + +func MatchHasField(field string, factory RunnerFactory) FactoryMatcher { + return func(cfg *common.Config) RunnerFactory { + if cfg.HasField(field) { + return factory + } + return nil + } +} + +func MatchDefault(factory RunnerFactory) FactoryMatcher { + return func(cfg *common.Config) RunnerFactory { + return factory + } +} + +func (f multiplexedFactory) Create( + p beat.Pipeline, + config *common.Config, + meta *common.MapStrPointer, +) (Runner, error) { + factory, err := f.findFactory(config) + if err != nil { + return nil, err + } + return factory.Create(p, config, meta) +} + +func (f multiplexedFactory) CheckConfig(c *common.Config) error { + factory, err := f.findFactory(c) + if err == nil { + err = factory.CheckConfig(c) + } + return err +} + +func (f multiplexedFactory) findFactory(c *common.Config) (RunnerFactory, error) { + for _, matcher := range f { + if factory := matcher(c); factory != nil { + return factory, nil + } + } + + return nil, errConfigDoesNotMatch +} diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 6b1be3d16e57..9153f81ba0c6 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -69,23 +69,19 @@ type Reload struct { Enabled bool `config:"enabled"` } -// RunnerFactory is used for creating of new Runners +// RunnerFactory is used for validating generated configurations and creating +// of new Runners type RunnerFactory interface { + ConfigChecker Create(p beat.Pipeline, config *common.Config, meta *common.MapStrPointer) (Runner, error) } -// ConfigChecker is usually combined with a RunnerFactory for implementations that can check a config -// without a pipeline and metadata. +// ConfigChecker is usually combined with a RunnerFactory for implementations +// that can check a config without a pipeline and metadata. type ConfigChecker interface { CheckConfig(config *common.Config) error } -// CheckableRunnerFactory is the union of RunnerFactory and ConfigChecker. -type CheckableRunnerFactory interface { - RunnerFactory - ConfigChecker -} - // Runner is a simple interface providing a simple way to // Start and Stop Reloader type Runner interface { diff --git a/metricbeat/autodiscover/include.go b/metricbeat/autodiscover/include.go deleted file mode 100644 index 0843fe6c156f..000000000000 --- a/metricbeat/autodiscover/include.go +++ /dev/null @@ -1,26 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package autodiscover - -import ( - // include all metricbeat specific builders - _ "github.com/elastic/beats/v7/metricbeat/autodiscover/builder/hints" - - // include all metricbeat specific appenders - _ "github.com/elastic/beats/v7/metricbeat/autodiscover/appender/kubernetes/token" -) diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index b396ac19c4bf..d13f4ebd29f4 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -33,8 +33,11 @@ import ( "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/module" - // Add autodiscover builders / appenders - _ "github.com/elastic/beats/v7/metricbeat/autodiscover" + // include all metricbeat specific builders + _ "github.com/elastic/beats/v7/metricbeat/autodiscover/builder/hints" + + // include all metricbeat specific appenders + _ "github.com/elastic/beats/v7/metricbeat/autodiscover/appender/kubernetes/token" // Add metricbeat default processors _ "github.com/elastic/beats/v7/metricbeat/processor/add_kubernetes_metadata" @@ -177,8 +180,14 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe if config.Autodiscover != nil { var err error - adapter := autodiscover.NewFactoryAdapter(factory) - metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover) + metricbeat.autodiscover, err = autodiscover.NewAutodiscover( + "metricbeat", + b.Publisher, + factory, + autodiscover.ConfigEvents(), + autodiscover.QueryConfig(), + config.Autodiscover, + ) if err != nil { return nil, err } From 4259281fab709fc536af2015e9122f9b5d71fe0c Mon Sep 17 00:00:00 2001 From: urso Date: Sat, 29 Feb 2020 15:16:40 +0100 Subject: [PATCH 02/14] add missing license header --- libbeat/cfgfile/factories.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/libbeat/cfgfile/factories.go b/libbeat/cfgfile/factories.go index 907de8a24784..cf4c85dea0cf 100644 --- a/libbeat/cfgfile/factories.go +++ b/libbeat/cfgfile/factories.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package cfgfile import ( From d69ad6c1d833e96ef99a40b63429718cd2c8377e Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 2 Mar 2020 21:26:23 +0100 Subject: [PATCH 03/14] fix build --- libbeat/autodiscover/autodiscover_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index a1d5c8f2286b..f26caee15ebd 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -166,7 +166,7 @@ func TestAutodiscover(t *testing.T) { } // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &config) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &adapter, &config) if err != nil { t.Fatal(err) } @@ -359,7 +359,7 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) { } // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &config) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &adapter, &config) if err != nil { t.Fatal(err) } From d2d13af47518919cf8629a3fb35d9f7635d817a0 Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 2 Mar 2020 21:37:15 +0100 Subject: [PATCH 04/14] remove EventSelector --- filebeat/beater/filebeat.go | 1 - heartbeat/beater/heartbeat.go | 8 +------ libbeat/autodiscover/autodiscover.go | 13 +++-------- libbeat/autodiscover/autodiscover_test.go | 6 ++--- libbeat/autodiscover/eventselect.go | 28 +++++------------------ metricbeat/beater/metricbeat.go | 8 +------ 6 files changed, 14 insertions(+), 50 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 5e82c53f7a4e..c03e01381ed8 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -315,7 +315,6 @@ func (fb *Filebeat) Run(b *beat.Beat) error { cfgfile.MatchHasField("module", moduleLoader), cfgfile.MatchDefault(inputLoader), ), - autodiscover.ConfigEvents(), autodiscover.QueryConfig(), config.Autodiscover, ) diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 1a09e718d133..dbdfe1dd740f 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -156,13 +156,7 @@ func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) { // makeAutodiscover creates an autodiscover object ready to be started. func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) { - return autodiscover.NewAutodiscover( - "heartbeat", b.Publisher, - bt.dynamicFactory, - autodiscover.ConfigEvents(), - autodiscover.QueryConfig(), - bt.config.Autodiscover, - ) + return autodiscover.NewAutodiscover("heartbeat", b.Publisher, bt.dynamicFactory, autodiscover.QueryConfig(), bt.config.Autodiscover) } // Stop stops the beat. diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 4805b45af3fc..a941019884fe 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -39,14 +39,10 @@ const ( // TODO autodiscover providers config reload -// EventSelector returns the bus filter to retrieve runner start/stop triggering events -type EventSelector interface { - EventFilter() []string -} - // EventConfigurer generates a valid list of configs from the given event, the -// received event will have all keys defined by `StartFilter` +// received event will have all keys defined by `StartFilter`. type EventConfigurer interface { + EventFilter() []string CreateConfig(bus.Event) ([]*common.Config, error) } @@ -56,7 +52,6 @@ type Autodiscover struct { bus bus.Bus defaultPipeline beat.Pipeline factory cfgfile.RunnerFactory - selector EventSelector configurer EventConfigurer providers []Provider configs map[string]map[uint64]*reload.ConfigWithMeta @@ -71,7 +66,6 @@ func NewAutodiscover( name string, pipeline beat.Pipeline, factory cfgfile.RunnerFactory, - selector EventSelector, configurer EventConfigurer, config *Config, ) (*Autodiscover, error) { @@ -95,7 +89,6 @@ func NewAutodiscover( bus: bus, defaultPipeline: pipeline, factory: factory, - selector: selector, configurer: configurer, configs: map[string]map[uint64]*reload.ConfigWithMeta{}, runners: cfgfile.NewRunnerList("autodiscover", factory, pipeline), @@ -112,7 +105,7 @@ func (a *Autodiscover) Start() { } a.logger.Info("Starting autodiscover manager") - a.listener = a.bus.Subscribe(a.selector.EventFilter()...) + a.listener = a.bus.Subscribe(a.configurer.EventFilter()...) // It is important to start the worker first before starting the producer. // In hosts that have large number of workloads, it is easy to have an initial diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index f26caee15ebd..65f0e744e60e 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -166,7 +166,7 @@ func TestAutodiscover(t *testing.T) { } // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &adapter, &config) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config) if err != nil { t.Fatal(err) } @@ -293,7 +293,7 @@ func TestAutodiscoverHash(t *testing.T) { } // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &adapter, &config) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config) if err != nil { t.Fatal(err) } @@ -359,7 +359,7 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) { } // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &adapter, &config) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config) if err != nil { t.Fatal(err) } diff --git a/libbeat/autodiscover/eventselect.go b/libbeat/autodiscover/eventselect.go index a8a9171e2f39..27a21dc740b3 100644 --- a/libbeat/autodiscover/eventselect.go +++ b/libbeat/autodiscover/eventselect.go @@ -24,40 +24,24 @@ import ( "github.com/elastic/beats/v7/libbeat/common/bus" ) -type selectedEvents []string - type queryConfigFrom string -// SelectEvents creates an event selector that will filter -// for the configured fields only. -func SelectEvents(events ...string) EventSelector { - if len(events) == 0 { - events = []string{"config"} - } - return selectedEvents(events) -} - -// ConfigEvents creates an EventSelector that will match "config" events only. -func ConfigEvents() EventSelector { - return SelectEvents("config") -} - -func (s selectedEvents) EventFilter() []string { - return []string(s) -} +var defaultConfigQuery = queryConfigFrom("config") // EventConfigQuery creates an EventConfigurer that tries to cast the given event // field from from the buf event into a []*common.Config. func EventConfigQuery(field string) EventConfigurer { - if field == "" { - field = "config" + if field == "" || field == "config" { + return defaultConfigQuery } return queryConfigFrom(field) } // QueryConfig extract an array of *common.Config from bus.Event. // The configurations are expected to be in the 'config' field. -func QueryConfig() EventConfigurer { return EventConfigQuery("config") } +func QueryConfig() EventConfigurer { return defaultConfigQuery } + +func (q queryConfigFrom) EventFilter() []string { return []string{string(q)} } func (q queryConfigFrom) CreateConfig(e bus.Event) ([]*common.Config, error) { config, ok := e[string(q)].([]*common.Config) diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index d13f4ebd29f4..050e7f265c15 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -181,13 +181,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe if config.Autodiscover != nil { var err error metricbeat.autodiscover, err = autodiscover.NewAutodiscover( - "metricbeat", - b.Publisher, - factory, - autodiscover.ConfigEvents(), - autodiscover.QueryConfig(), - config.Autodiscover, - ) + "metricbeat", b.Publisher, factory, autodiscover.QueryConfig(), config.Autodiscover) if err != nil { return nil, err } From 639b367b937702ecef047b2f3e6e2933e7d5d51a Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 2 Mar 2020 21:54:17 +0100 Subject: [PATCH 05/14] Add godoc to cfgfile/factories.go --- libbeat/cfgfile/factories.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libbeat/cfgfile/factories.go b/libbeat/cfgfile/factories.go index cf4c85dea0cf..93a31165fa68 100644 --- a/libbeat/cfgfile/factories.go +++ b/libbeat/cfgfile/factories.go @@ -26,14 +26,19 @@ import ( type multiplexedFactory []FactoryMatcher +// FactoryMatcher returns a RunnerFactory that should be used to +// handle the given configuration. type FactoryMatcher func(cfg *common.Config) RunnerFactory var errConfigDoesNotMatch = errors.New("config does not match accepted configurations") +// MultiplexedRunnerFactory is a RunnerFactory that uses a list of +// FactoryMatcher to choose which RunnerFactory should handle the configuration. func MultiplexedRunnerFactory(matchers ...FactoryMatcher) RunnerFactory { return multiplexedFactory(matchers) } +// MatchHasField returns the configured RunnerFactory if the configation contains the configured field. func MatchHasField(field string, factory RunnerFactory) FactoryMatcher { return func(cfg *common.Config) RunnerFactory { if cfg.HasField(field) { @@ -43,6 +48,7 @@ func MatchHasField(field string, factory RunnerFactory) FactoryMatcher { } } +// MatchDefault always returns the configured runner factory. func MatchDefault(factory RunnerFactory) FactoryMatcher { return func(cfg *common.Config) RunnerFactory { return factory From 90577b0d8e100e0c272ee27648ec70768bacc50a Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 5 Mar 2020 17:48:38 +0100 Subject: [PATCH 06/14] wrap error to show where it comes from in reloader --- libbeat/cfgfile/reload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 9153f81ba0c6..7fc3b9485f27 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -141,7 +141,7 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { // Load all config objects configs, err := rl.loadConfigs(files) if err != nil { - return err + return errors.Wrap(err, "loading configs") } debugf("Number of module configs found: %v", len(configs)) From ec8cbc6eac691397679bd635f7c538f23c7a3ac7 Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 5 Mar 2020 17:49:12 +0100 Subject: [PATCH 07/14] More crawler improvements: - wrap errors to give some context which operation failed - do not leak inputloader background process if configuration fails --- filebeat/beater/crawler.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/filebeat/beater/crawler.go b/filebeat/beater/crawler.go index 99cd952e4e44..ede58e25bcf8 100644 --- a/filebeat/beater/crawler.go +++ b/filebeat/beater/crawler.go @@ -76,27 +76,32 @@ func (c *crawler) Start( for _, inputConfig := range c.inputConfigs { err := c.startInput(pipeline, inputConfig, r.GetStates()) if err != nil { - return err + return fmt.Errorf("starting input failed: %+v", err) } } if configInputs.Enabled() { c.inputReloader = cfgfile.NewReloader(pipeline, configInputs) if err := c.inputReloader.Check(c.inputsFactory); err != nil { - return err + return fmt.Errorf("creating input reloader failed: %+v", err) } - go func() { - c.inputReloader.Run(c.inputsFactory) - }() } if configModules.Enabled() { c.modulesReloader = cfgfile.NewReloader(pipeline, configModules) if err := c.modulesReloader.Check(c.modulesFactory); err != nil { - return err + return fmt.Errorf("creating module reloader failed: %+v", err) } + } + + if c.inputReloader != nil { + go func() { + c.inputReloader.Run(c.inputsFactory) + }() + } + if c.modulesReloader != nil { go func() { c.modulesReloader.Run(c.modulesFactory) }() From 2ea301fec1442e4cfa2dccc77475c7b2b21e337e Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 5 Mar 2020 17:50:23 +0100 Subject: [PATCH 08/14] filebeat beater wrap error --- filebeat/beater/filebeat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index c03e01381ed8..226646150e41 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -286,7 +286,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { err = crawler.Start(b.Publisher, registrar, config.ConfigInput, config.ConfigModules) if err != nil { crawler.Stop() - return err + return fmt.Errorf("Failed to start crawler: %+v", err) } // If run once, add crawler completion check as alternative to done signal From 33c182a80bb090ff5f0768dbf36a3e4120e83b53 Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 5 Mar 2020 18:08:56 +0100 Subject: [PATCH 09/14] RunnerFactory.Create uses beat.PipelineConnector only --- filebeat/channel/connector.go | 2 +- filebeat/channel/factory.go | 2 +- filebeat/channel/interface.go | 2 +- filebeat/fileset/factory.go | 2 +- filebeat/fileset/setup.go | 2 +- filebeat/input/runnerfactory.go | 2 +- heartbeat/monitors/factory.go | 2 +- libbeat/cfgfile/factories.go | 2 +- libbeat/cfgfile/reload.go | 2 +- metricbeat/mb/module/connector.go | 4 ++-- metricbeat/mb/module/factory.go | 2 +- 11 files changed, 12 insertions(+), 12 deletions(-) diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index 8f253bb13230..73da881ec326 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -30,7 +30,7 @@ type ConnectorFunc func(*common.Config, beat.ClientConfig) (Outleter, error) type pipelineConnector struct { parent *OutletFactory - pipeline beat.Pipeline + pipeline beat.PipelineConnector } // Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function. diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index 3bfeccc01505..b145c4a34f5f 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -87,7 +87,7 @@ func NewOutletFactory( // Inputs and all harvesters use the same pipeline client instance. // This guarantees ordering between events as required by the registrar for // file.State updates -func (f *OutletFactory) Create(p beat.Pipeline) Connector { +func (f *OutletFactory) Create(p beat.PipelineConnector) Connector { return &pipelineConnector{parent: f, pipeline: p} } diff --git a/filebeat/channel/interface.go b/filebeat/channel/interface.go index 0069cda6f020..9df9ff584b32 100644 --- a/filebeat/channel/interface.go +++ b/filebeat/channel/interface.go @@ -23,7 +23,7 @@ import ( ) // Factory is used to create a new Outlet instance -type Factory func(beat.Pipeline) Connector +type Factory func(beat.PipelineConnector) Connector // Connector creates an Outlet connecting the event publishing with some internal pipeline. // type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error) diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 70cde2079a53..82f81d6c2a9c 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -75,7 +75,7 @@ func NewFactory( } // Create creates a module based on a config -func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { // Start a registry of one module: m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false) if err != nil { diff --git a/filebeat/fileset/setup.go b/filebeat/fileset/setup.go index af02d8148561..6a058e9b6304 100644 --- a/filebeat/fileset/setup.go +++ b/filebeat/fileset/setup.go @@ -41,7 +41,7 @@ func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFac } // Create creates a new SetupCfgRunner to setup module configuration. -func (sf *SetupFactory) Create(_ beat.Pipeline, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) { +func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) { m, err := NewModuleRegistry([]*common.Config{c}, sf.beatInfo, false) if err != nil { return nil, err diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index 89534a852697..8066f23a4ea3 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -43,7 +43,7 @@ func NewRunnerFactory(outlet channel.Factory, registrar *registrar.Registrar, be // Create creates a input based on a config func (r *RunnerFactory) Create( - pipeline beat.Pipeline, + pipeline beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer, ) (cfgfile.Runner, error) { diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index b6c39d14d4d9..26937020dd93 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -37,7 +37,7 @@ func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory { } // Create makes a new Runner for a new monitor with the given Config. -func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches, meta) return monitor, err } diff --git a/libbeat/cfgfile/factories.go b/libbeat/cfgfile/factories.go index 93a31165fa68..5ff4dc43e9f8 100644 --- a/libbeat/cfgfile/factories.go +++ b/libbeat/cfgfile/factories.go @@ -56,7 +56,7 @@ func MatchDefault(factory RunnerFactory) FactoryMatcher { } func (f multiplexedFactory) Create( - p beat.Pipeline, + p beat.PipelineConnector, config *common.Config, meta *common.MapStrPointer, ) (Runner, error) { diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 7fc3b9485f27..60ee8891adf7 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -73,7 +73,7 @@ type Reload struct { // of new Runners type RunnerFactory interface { ConfigChecker - Create(p beat.Pipeline, config *common.Config, meta *common.MapStrPointer) (Runner, error) + Create(p beat.PipelineConnector, config *common.Config, meta *common.MapStrPointer) (Runner, error) } // ConfigChecker is usually combined with a RunnerFactory for implementations diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index 5d9c75a573c9..ea2292bd74cc 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -30,7 +30,7 @@ import ( // Connector configures and establishes a beat.Client for publishing events // to the publisher pipeline. type Connector struct { - pipeline beat.Pipeline + pipeline beat.PipelineConnector processors *processors.Processors eventMeta common.EventMetadata dynamicFields *common.MapStrPointer @@ -54,7 +54,7 @@ type metricSetRegister interface { } func NewConnector( - beatInfo beat.Info, pipeline beat.Pipeline, + beatInfo beat.Info, pipeline beat.PipelineConnector, c *common.Config, dynFields *common.MapStrPointer, ) (*Connector, error) { config := connectorConfig{} diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index 5392701ccb0f..9256fc5b5b11 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -40,7 +40,7 @@ func NewFactory(beatInfo beat.Info, options ...Option) *Factory { } // Create creates a new metricbeat module runner reporting events to the passed pipeline. -func (r *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (r *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { module, metricSets, err := mb.NewModule(c, mb.Registry) if err != nil { return nil, err From 71ee8ca16ce2eae2a940a326db47d8d6f2670ba7 Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 5 Mar 2020 18:27:17 +0100 Subject: [PATCH 10/14] add nilpipeline --- libbeat/publisher/pipeline/nilpipeline.go | 62 +++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 libbeat/publisher/pipeline/nilpipeline.go diff --git a/libbeat/publisher/pipeline/nilpipeline.go b/libbeat/publisher/pipeline/nilpipeline.go new file mode 100644 index 000000000000..3f48c5354cd7 --- /dev/null +++ b/libbeat/publisher/pipeline/nilpipeline.go @@ -0,0 +1,62 @@ +package pipeline + +import "github.com/elastic/beats/v7/libbeat/beat" + +type NilPipeline struct{} + +type nilClient struct { + eventer beat.ClientEventer + ackCount func(int) + ackEvents func([]interface{}) + ackLastEvent func(interface{}) +} + +var _nilPipeline = (*NilPipeline)(nil) + +func NewNilPipeline() *NilPipeline { return _nilPipeline } + +func (p *NilPipeline) Connect() (beat.Client, error) { + return p.ConnectWith(beat.ClientConfig{}) +} + +func (p *NilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { + return &nilClient{ + eventer: cfg.Events, + ackCount: cfg.ACKCount, + ackEvents: cfg.ACKEvents, + ackLastEvent: cfg.ACKLastEvent, + }, nil +} + +func (c *nilClient) Publish(event beat.Event) { + c.PublishAll([]beat.Event{event}) +} + +func (c *nilClient) PublishAll(events []beat.Event) { + L := len(events) + if L == 0 { + return + } + + if c.ackLastEvent != nil { + c.ackLastEvent(events[L-1].Private) + } + if c.ackEvents != nil { + tmp := make([]interface{}, L) + for i := range events { + tmp[i] = events[i].Private + } + c.ackEvents(tmp) + } + if c.ackCount != nil { + c.ackCount(L) + } +} + +func (c *nilClient) Close() error { + if c.eventer != nil { + c.eventer.Closing() + c.eventer.Closed() + } + return nil +} From daf9964b9e2bf07e75f5b9d3b18dbf23cf397a87 Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 5 Mar 2020 18:40:03 +0100 Subject: [PATCH 11/14] Implement CheckConfig for filebeat inputs/fileset factories --- filebeat/fileset/factory.go | 4 +++- filebeat/fileset/setup.go | 4 +++- filebeat/input/runnerfactory.go | 6 ++++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 82f81d6c2a9c..46e86d7a33f1 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/mitchellh/hashstructure" ) @@ -115,7 +116,8 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *commo } func (f *Factory) CheckConfig(c *common.Config) error { - return nil + _, err := f.Create(pubpipeline.NewNilPipeline(), c, nil) + return err } func (p *inputsRunner) Start() { diff --git a/filebeat/fileset/setup.go b/filebeat/fileset/setup.go index 6a058e9b6304..027e9506dc0c 100644 --- a/filebeat/fileset/setup.go +++ b/filebeat/fileset/setup.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline" ) // SetupFactory is for loading module assets when running setup subcommand. @@ -55,7 +56,8 @@ func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *co } func (sf *SetupFactory) CheckConfig(c *common.Config) error { - return nil + _, err := sf.Create(pubpipeline.NewNilPipeline(), c, nil) + return err } // SetupCfgRunner is for loading assets of modules. diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index 8066f23a4ea3..a47e848bc6d4 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" ) // RunnerFactory is a factory for registrars @@ -57,6 +58,7 @@ func (r *RunnerFactory) Create( return p, nil } -func (r *RunnerFactory) CheckConfig(_ *common.Config) error { - return nil +func (r *RunnerFactory) CheckConfig(cfg *common.Config) error { + _, err := r.Create(pipeline.NewNilPipeline(), cfg, nil) + return err } From 598701665a56c80b3572d35f07646014417f0144 Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 5 Mar 2020 20:50:00 +0100 Subject: [PATCH 12/14] fix some tests --- libbeat/autodiscover/autodiscover_test.go | 2 +- libbeat/cfgfile/list_test.go | 2 +- libbeat/publisher/pipeline/nilpipeline.go | 17 +++++++++++++++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index 65f0e744e60e..182cd99f3cb5 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -92,7 +92,7 @@ func (m *mockAdapter) CheckConfig(c *common.Config) error { return nil } -func (m *mockAdapter) Create(_ beat.Pipeline, config *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (m *mockAdapter) Create(_ beat.PipelineConnector, config *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { runner := &mockRunner{ config: config, meta: meta, diff --git a/libbeat/cfgfile/list_test.go b/libbeat/cfgfile/list_test.go index b9ae56878318..9d28187a5034 100644 --- a/libbeat/cfgfile/list_test.go +++ b/libbeat/cfgfile/list_test.go @@ -43,7 +43,7 @@ func (r *runner) Stop() { r.stopped = true } type runnerFactory struct{ runners []*runner } -func (r *runnerFactory) Create(x beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (Runner, error) { +func (r *runnerFactory) Create(x beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (Runner, error) { config := struct { ID int64 `config:"id"` }{} diff --git a/libbeat/publisher/pipeline/nilpipeline.go b/libbeat/publisher/pipeline/nilpipeline.go index 3f48c5354cd7..170f2909f205 100644 --- a/libbeat/publisher/pipeline/nilpipeline.go +++ b/libbeat/publisher/pipeline/nilpipeline.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package pipeline import "github.com/elastic/beats/v7/libbeat/beat" From 83a44718fc528a2fcaab3b072c1d330d80233bbc Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 10 Mar 2020 21:40:45 +0100 Subject: [PATCH 13/14] godoc --- libbeat/autodiscover/autodiscover.go | 12 ++++++++---- libbeat/autodiscover/eventselect.go | 5 +++-- libbeat/cfgfile/factories.go | 17 +++++++++++++---- libbeat/cfgfile/reload.go | 17 +++++------------ libbeat/publisher/pipeline/nilpipeline.go | 14 +++++++++----- 5 files changed, 38 insertions(+), 27 deletions(-) diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index a941019884fe..c4941c4b176f 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -37,12 +37,16 @@ const ( retryPeriod = 10 * time.Second ) -// TODO autodiscover providers config reload - -// EventConfigurer generates a valid list of configs from the given event, the -// received event will have all keys defined by `StartFilter`. +// EventConfigurer is used to configure the creation of configuration objects +// from the autodiscover event bus. type EventConfigurer interface { + // EventFilter returns the bus filter to retrieve runner start/stop triggering + // events. The bus will filter events to the ones, that contain *all* the + // the required top-level keys. EventFilter() []string + + // CreateConfig creates a list of configurations from a bus.Event. The + // received event will have all keys defined in `EventFilter`. CreateConfig(bus.Event) ([]*common.Config, error) } diff --git a/libbeat/autodiscover/eventselect.go b/libbeat/autodiscover/eventselect.go index 27a21dc740b3..efc196561cb6 100644 --- a/libbeat/autodiscover/eventselect.go +++ b/libbeat/autodiscover/eventselect.go @@ -44,9 +44,10 @@ func QueryConfig() EventConfigurer { return defaultConfigQuery } func (q queryConfigFrom) EventFilter() []string { return []string{string(q)} } func (q queryConfigFrom) CreateConfig(e bus.Event) ([]*common.Config, error) { - config, ok := e[string(q)].([]*common.Config) + fieldName := string(q)j + config, ok := e[fieldName].([]*common.Config) if !ok { - return nil, errors.New("Got a wrong value in event `config` key") + return nil, fmt.Errorf("Event field '%q' does not contain a valid configuration object", fieldname) } return config, nil } diff --git a/libbeat/cfgfile/factories.go b/libbeat/cfgfile/factories.go index 5ff4dc43e9f8..095a70772050 100644 --- a/libbeat/cfgfile/factories.go +++ b/libbeat/cfgfile/factories.go @@ -26,19 +26,27 @@ import ( type multiplexedFactory []FactoryMatcher -// FactoryMatcher returns a RunnerFactory that should be used to -// handle the given configuration. +// FactoryMatcher returns a RunnerFactory that can handle the given +// configuration if it supports it, otherwise it returns nil. type FactoryMatcher func(cfg *common.Config) RunnerFactory var errConfigDoesNotMatch = errors.New("config does not match accepted configurations") // MultiplexedRunnerFactory is a RunnerFactory that uses a list of // FactoryMatcher to choose which RunnerFactory should handle the configuration. +// When presented a Config object, MultiplexedRunnerFactory will query the +// matchers in the order given. The first RunnerFactory returned will be used +// to create the runner. +// Creating a runner or checking a configuration will return an error if no +// matcher was found. Use MatchDefault as last argument to +// MultiplexedRunnerFactory to configure a default RunnerFactory that shall +// always be used if no other factory was matched. func MultiplexedRunnerFactory(matchers ...FactoryMatcher) RunnerFactory { return multiplexedFactory(matchers) } -// MatchHasField returns the configured RunnerFactory if the configation contains the configured field. +// MatchHasField returns a FactoryMatcher that returns the given RunnerFactory +// when the input config contains the given field name. func MatchHasField(field string, factory RunnerFactory) FactoryMatcher { return func(cfg *common.Config) RunnerFactory { if cfg.HasField(field) { @@ -48,7 +56,8 @@ func MatchHasField(field string, factory RunnerFactory) FactoryMatcher { } } -// MatchDefault always returns the configured runner factory. +// MatchDefault return a FactoryMatcher that always returns returns the given +// RunnerFactory. func MatchDefault(factory RunnerFactory) FactoryMatcher { return func(cfg *common.Config) RunnerFactory { return factory diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 60ee8891adf7..990264c3a6be 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -72,13 +72,12 @@ type Reload struct { // RunnerFactory is used for validating generated configurations and creating // of new Runners type RunnerFactory interface { - ConfigChecker + // Create creates a new Runner based on the given configuration. Create(p beat.PipelineConnector, config *common.Config, meta *common.MapStrPointer) (Runner, error) -} -// ConfigChecker is usually combined with a RunnerFactory for implementations -// that can check a config without a pipeline and metadata. -type ConfigChecker interface { + // CheckConfig tests if a confiugation can be used to create an input. If it + // is not possible to create an input using the configuration, an error must + // be returned. CheckConfig(config *common.Config) error } @@ -153,13 +152,7 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { continue } - if checker, ok := runnerFactory.(ConfigChecker); ok { - err = checker.CheckConfig(c.Config) - } else { - _, err = runnerFactory.Create(rl.pipeline, c.Config, c.Meta) - } - - if err != nil { + if err = runnerFactory.CheckConfig(c.Config); err != nil { return err } } diff --git a/libbeat/publisher/pipeline/nilpipeline.go b/libbeat/publisher/pipeline/nilpipeline.go index 170f2909f205..f32785a8d22b 100644 --- a/libbeat/publisher/pipeline/nilpipeline.go +++ b/libbeat/publisher/pipeline/nilpipeline.go @@ -19,7 +19,7 @@ package pipeline import "github.com/elastic/beats/v7/libbeat/beat" -type NilPipeline struct{} +type nilPipeline struct{} type nilClient struct { eventer beat.ClientEventer @@ -28,15 +28,19 @@ type nilClient struct { ackLastEvent func(interface{}) } -var _nilPipeline = (*NilPipeline)(nil) +var _nilPipeline = (*nilPipeline)(nil) -func NewNilPipeline() *NilPipeline { return _nilPipeline } +// NewNilPipeline returns a new pipeline that is compatible with +// beats.PipelineConnector. The pipeline will discard all events that have been +// published. Client ACK handlers will still be executed, but the callbacks +// will be executed immediately when the event is published. +func NewNilPipeline() beat.PipelineConnector { return _nilPipeline } -func (p *NilPipeline) Connect() (beat.Client, error) { +func (p *nilPipeline) Connect() (beat.Client, error) { return p.ConnectWith(beat.ClientConfig{}) } -func (p *NilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { +func (p *nilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { return &nilClient{ eventer: cfg.Events, ackCount: cfg.ACKCount, From ba9f2777723fd38f136d03f3fa3fc75c05e9e89c Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 10 Mar 2020 22:30:09 +0100 Subject: [PATCH 14/14] fix eventselect --- libbeat/autodiscover/eventselect.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libbeat/autodiscover/eventselect.go b/libbeat/autodiscover/eventselect.go index efc196561cb6..670fc84ed602 100644 --- a/libbeat/autodiscover/eventselect.go +++ b/libbeat/autodiscover/eventselect.go @@ -18,7 +18,7 @@ package autodiscover import ( - "errors" + "fmt" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/bus" @@ -44,10 +44,10 @@ func QueryConfig() EventConfigurer { return defaultConfigQuery } func (q queryConfigFrom) EventFilter() []string { return []string{string(q)} } func (q queryConfigFrom) CreateConfig(e bus.Event) ([]*common.Config, error) { - fieldName := string(q)j + fieldName := string(q) config, ok := e[fieldName].([]*common.Config) if !ok { - return nil, fmt.Errorf("Event field '%q' does not contain a valid configuration object", fieldname) + return nil, fmt.Errorf("Event field '%v' does not contain a valid configuration object", fieldName) } return config, nil }