diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 991ae798480e..3f59ee2de47b 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -22,8 +22,6 @@ import ( "fmt" "strings" - "github.com/elastic/beats/libbeat/common/reload" - "github.com/pkg/errors" "github.com/elastic/beats/libbeat/autodiscover" @@ -31,6 +29,7 @@ import ( "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/management" "github.com/elastic/beats/libbeat/monitoring" diff --git a/filebeat/filebeat.spec b/filebeat/filebeat.spec new file mode 100644 index 000000000000..76c08d472c7b --- /dev/null +++ b/filebeat/filebeat.spec @@ -0,0 +1 @@ +{"BinaryPath":"filebeat","Args":["-e"],"Configurable":"grpc"} diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index b5b739d55e37..5386ecf33023 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -618,7 +618,7 @@ func (b *Beat) configure(settings Settings) error { logp.Info("Beat ID: %v", b.Info.ID) // initialize config manager - b.ConfigManager, err = management.Factory()(b.Config.Management, reload.Register, b.Beat.Info.ID) + b.ConfigManager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID) if err != nil { return err } diff --git a/libbeat/management/management.go b/libbeat/management/management.go index 5725eea77157..a14fae6dfe3e 100644 --- a/libbeat/management/management.go +++ b/libbeat/management/management.go @@ -31,6 +31,8 @@ var Namespace = "libbeat.management" // DebugK used as key for all things central management var DebugK = "centralmgmt" +var centralMgmtKey = "x-pack-cm" + // ConfigManager interacts with the beat to update configurations // from an external source type ConfigManager interface { @@ -47,32 +49,47 @@ type ConfigManager interface { CheckRawConfig(cfg *common.Config) error } +// PluginFunc for creating FactoryFunc if it matches a config +type PluginFunc func(*common.Config) FactoryFunc + // FactoryFunc for creating a config manager type FactoryFunc func(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, error) // Register a config manager -func Register(name string, fn FactoryFunc, stability feature.Stability) { +func Register(name string, fn PluginFunc, stability feature.Stability) { f := feature.New(Namespace, name, fn, feature.NewDetails(name, "", stability)) feature.MustRegister(f) } // Factory retrieves config manager constructor. If no one is registered // it will create a nil manager -func Factory() FactoryFunc { +func Factory(cfg *common.Config) FactoryFunc { factories, err := feature.GlobalRegistry().LookupAll(Namespace) if err != nil { return nilFactory } for _, f := range factories { - if factory, ok := f.Factory().(FactoryFunc); ok { - return factory + if plugin, ok := f.Factory().(PluginFunc); ok { + if factory := plugin(cfg); factory != nil { + return factory + } } } return nilFactory } +type modeConfig struct { + Mode string `config:"mode" yaml:"mode"` +} + +func defaultModeConfig() *modeConfig { + return &modeConfig{ + Mode: centralMgmtKey, + } +} + // nilManager, fallback when no manager is present type nilManager struct{} diff --git a/x-pack/agent/pkg/agent/configrequest/step.go b/x-pack/agent/pkg/agent/configrequest/step.go index faf313199fac..ea96090ec550 100644 --- a/x-pack/agent/pkg/agent/configrequest/step.go +++ b/x-pack/agent/pkg/agent/configrequest/step.go @@ -9,10 +9,6 @@ const ( StepRun = "sc-run" // StepRemove is a name of Remove program event causing beat in version to be uninstalled StepRemove = "sc-remove" - // StepStartSidecar is a name of start program monitoring event - StepStartSidecar = "sc-sidecar-start" - // StepStopSidecar is a name of stop program monitoring event - StepStopSidecar = "sc-sidecar-stop" // MetaConfigKey is key used to store configuration in metadata MetaConfigKey = "config" diff --git a/x-pack/agent/pkg/agent/operation/operator_handlers.go b/x-pack/agent/pkg/agent/operation/operator_handlers.go index 6495a6affba8..607b39a6da02 100644 --- a/x-pack/agent/pkg/agent/operation/operator_handlers.go +++ b/x-pack/agent/pkg/agent/operation/operator_handlers.go @@ -20,8 +20,6 @@ func (o *Operator) initHandlerMap() { hm[configrequest.StepRun] = o.handleRun hm[configrequest.StepRemove] = o.handleRemove - hm[configrequest.StepStartSidecar] = o.handleStartSidecar - hm[configrequest.StepStopSidecar] = o.handleStopSidecar o.handlers = hm } @@ -46,7 +44,7 @@ func (o *Operator) handleRemove(step configrequest.Step) error { p, _, err := getProgramFromStep(step) if err != nil { - return errors.Wrap(err, "operator.handleStart failed to create program") + return errors.Wrap(err, "operator.handleRemove failed to stop program") } return o.stop(p) @@ -67,15 +65,24 @@ func getProgramFromStepWithTags(step configrequest.Step, tags map[app.Tag]string } func getConfigFromStep(step configrequest.Step) (map[string]interface{}, error) { - metConfig, ok := step.Meta[configrequest.MetaConfigKey] - if !ok { + metConfig, hasConfig := step.Meta[configrequest.MetaConfigKey] + + if !hasConfig && needsMetaConfig(step) { return nil, fmt.Errorf("step: %s, no config in metadata", step.ID) } - config, ok := metConfig.(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("step: %s, program config is in invalid format", step.ID) + var config map[string]interface{} + if hasConfig { + var ok bool + config, ok = metConfig.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("step: %s, program config is in invalid format", step.ID) + } } return config, nil } + +func needsMetaConfig(step configrequest.Step) bool { + return step.ID == configrequest.StepRun +} diff --git a/x-pack/filebeat/filebeat.spec b/x-pack/filebeat/filebeat.spec new file mode 100644 index 000000000000..76c08d472c7b --- /dev/null +++ b/x-pack/filebeat/filebeat.spec @@ -0,0 +1 @@ +{"BinaryPath":"filebeat","Args":["-e"],"Configurable":"grpc"} diff --git a/x-pack/libbeat/cmd/inject.go b/x-pack/libbeat/cmd/inject.go index fa5ccea9edbd..c02476c83fa2 100644 --- a/x-pack/libbeat/cmd/inject.go +++ b/x-pack/libbeat/cmd/inject.go @@ -10,8 +10,13 @@ import ( // register central management "github.com/elastic/beats/x-pack/libbeat/licenser" + + // Register Central Management _ "github.com/elastic/beats/x-pack/libbeat/management" + // Register fleet + _ "github.com/elastic/beats/x-pack/libbeat/management/fleet" + // register autodiscover providers _ "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws/elb" ) diff --git a/x-pack/libbeat/management/config.go b/x-pack/libbeat/management/config.go index 908f2bef1baa..3d8e62eaf750 100644 --- a/x-pack/libbeat/management/config.go +++ b/x-pack/libbeat/management/config.go @@ -68,11 +68,23 @@ const ManagedConfigTemplate = ` #monitoring.elasticsearch: ` -// Config for central management +const ( + // ModeCentralManagement is a default CM mode, using existing processes. + ModeCentralManagement = "x-pack-cm" + + // ModeFleet is a management mode where fleet is used to retrieve configurations. + ModeFleet = "x-pack-fleet" +) + +// Config for central management. type Config struct { // true when enrolled Enabled bool `config:"enabled" yaml:"enabled"` + // Mode specifies whether beat uses Central Management or Fleet. + // Options: [cm, fleet] + Mode string `config:"mode" yaml:"mode"` + // Poll configs period Period time.Duration `config:"period" yaml:"period"` @@ -93,6 +105,7 @@ type EventReporterConfig struct { func defaultConfig() *Config { return &Config{ + Mode: ModeCentralManagement, Period: 60 * time.Second, EventsReporter: EventReporterConfig{ Period: 30 * time.Second, @@ -111,7 +124,7 @@ type templateParams struct { BeatName string } -// OverwriteConfigFile will overwrite beat settings file with the enrolled template +// OverwriteConfigFile will overwrite beat settings file with the enrolled template. func (c *Config) OverwriteConfigFile(wr io.Writer, beatName string) error { t := template.Must(template.New("beat.management.yml").Parse(ManagedConfigTemplate)) diff --git a/x-pack/libbeat/management/error.go b/x-pack/libbeat/management/error.go index b4d5b6eaf23b..a9513a04391b 100644 --- a/x-pack/libbeat/management/error.go +++ b/x-pack/libbeat/management/error.go @@ -103,6 +103,7 @@ func (er *Errors) IsEmpty() bool { return len(*er) == 0 } -func newConfigError(err error) *Error { +// NewConfigError wraps an error to be a management error of a specific ConfigError Type +func NewConfigError(err error) *Error { return &Error{Type: ConfigError, Err: err} } diff --git a/x-pack/libbeat/management/error_test.go b/x-pack/libbeat/management/error_test.go index 0e40632630af..ed068749105c 100644 --- a/x-pack/libbeat/management/error_test.go +++ b/x-pack/libbeat/management/error_test.go @@ -54,14 +54,14 @@ func TestErrorSerialization(t *testing.T) { func TestErrors(t *testing.T) { t.Run("single error", func(t *testing.T) { - errors := Errors{newConfigError(errors.New("error1"))} + errors := Errors{NewConfigError(errors.New("error1"))} assert.Equal(t, "1 error: error1", errors.Error()) }) t.Run("multiple errors", func(t *testing.T) { errors := Errors{ - newConfigError(errors.New("error1")), - newConfigError(errors.New("error2")), + NewConfigError(errors.New("error1")), + NewConfigError(errors.New("error2")), } assert.Equal(t, "2 errors: error1; error2", errors.Error()) }) diff --git a/x-pack/libbeat/management/fleet/config.go b/x-pack/libbeat/management/fleet/config.go new file mode 100644 index 000000000000..29619d070fdf --- /dev/null +++ b/x-pack/libbeat/management/fleet/config.go @@ -0,0 +1,27 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleet + +import ( + xmanagement "github.com/elastic/beats/x-pack/libbeat/management" +) + +// Config for central management +type Config struct { + Enabled bool `config:"enabled" yaml:"enabled"` + Mode string `config:"mode" yaml:"mode"` + Blacklist xmanagement.ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"` +} + +func defaultConfig() *Config { + return &Config{ + Mode: xmanagement.ModeCentralManagement, + Blacklist: xmanagement.ConfigBlacklistSettings{ + Patterns: map[string]string{ + "output": "console|file", + }, + }, + } +} diff --git a/x-pack/libbeat/management/fleet/config_server.go b/x-pack/libbeat/management/fleet/config_server.go new file mode 100644 index 000000000000..45c9c54eaea8 --- /dev/null +++ b/x-pack/libbeat/management/fleet/config_server.go @@ -0,0 +1,56 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleet + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/x-pack/agent/pkg/core/remoteconfig/grpc" +) + +const ( + defaultTimeout = 15 * time.Second +) + +// Server is a server for handling communication between +// beat and Elastic Agent. +type Server struct { + configChan chan<- map[string]interface{} +} + +// NewConfigServer creates a new grpc configuration server for receiving +// configurations from Elastic Agent. +func NewConfigServer(configChan chan<- map[string]interface{}) *Server { + return &Server{ + configChan: configChan, + } +} + +// Config is a handler of a call made by agent pushing latest configuration. +func (s *Server) Config(ctx context.Context, req *grpc.ConfigRequest) (*grpc.ConfigResponse, error) { + cfgString := req.GetConfig() + + var configMap common.MapStr + uconfig, err := common.NewConfigFrom(cfgString) + if err != nil { + return &grpc.ConfigResponse{}, fmt.Errorf("config blocks unsuccessfully generated: %+v", err) + } + + err = uconfig.Unpack(&configMap) + if err != nil { + return &grpc.ConfigResponse{}, fmt.Errorf("config blocks unsuccessfully generated: %+v", err) + } + + select { + case s.configChan <- configMap: + case <-time.After(defaultTimeout): + return &grpc.ConfigResponse{}, errors.New("failed to push configuration: Timeout") + } + return &grpc.ConfigResponse{}, nil +} diff --git a/x-pack/libbeat/management/fleet/manager.go b/x-pack/libbeat/management/fleet/manager.go new file mode 100644 index 000000000000..0570f0e86fc7 --- /dev/null +++ b/x-pack/libbeat/management/fleet/manager.go @@ -0,0 +1,279 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleet + +import ( + "fmt" + "os" + "sort" + "sync" + + "github.com/gofrs/uuid" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/common/reload" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/management" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/server" + "github.com/elastic/beats/x-pack/libbeat/management/api" + + xmanagement "github.com/elastic/beats/x-pack/libbeat/management" +) + +// ConfigManager provides a functionality to retrieve config channel +// using which manager is informed about config changes. +type ConfigManager interface { + ConfigChan() chan<- map[string]interface{} +} + +// Manager handles internal config updates. By retrieving +// new configs from Kibana and applying them to the Beat. +type Manager struct { + config *Config + logger *logp.Logger + beatUUID uuid.UUID + done chan struct{} + registry *reload.Registry + wg sync.WaitGroup + blacklist *xmanagement.ConfigBlacklist + + configChan chan map[string]interface{} +} + +// NewFleetManager returns a X-Pack Beats Fleet Management manager. +func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) { + c := defaultConfig() + if config.Enabled() { + if err := config.Unpack(&c); err != nil { + return nil, errors.Wrap(err, "parsing fleet management settings") + } + } + return NewFleetManagerWithConfig(c, registry, beatUUID) +} + +// NewFleetManagerWithConfig returns a X-Pack Beats Fleet Management manager. +func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) { + var blacklist *xmanagement.ConfigBlacklist + + if c.Enabled && c.Mode == xmanagement.ModeFleet { + var err error + + // Initialize configs blacklist + blacklist, err = xmanagement.NewConfigBlacklist(c.Blacklist) + if err != nil { + return nil, errors.Wrap(err, "wrong settings for configurations blacklist") + } + } + + log := logp.NewLogger(management.DebugK) + + m := &Manager{ + config: c, + blacklist: blacklist, + logger: log.Named("fleet"), + done: make(chan struct{}), + beatUUID: beatUUID, + registry: registry, + configChan: make(chan map[string]interface{}), + } + + go m.startGrpcServer() + + return m, nil +} + +// Enabled returns true if config management is enabled. +func (cm *Manager) Enabled() bool { + return cm.config.Enabled && cm.config.Mode == xmanagement.ModeFleet +} + +// ConfigChan returns a channel used to communicate configuration changes. +func (cm *Manager) ConfigChan() chan<- map[string]interface{} { + return cm.configChan +} + +// Start the config manager +func (cm *Manager) Start() { + if !cm.Enabled() { + return + } + + cfgwarn.Beta("Fleet management is enabled") + cm.logger.Info("Starting fleet management service") + + cm.wg.Add(1) + go cm.worker() +} + +// Stop the config manager +func (cm *Manager) Stop() { + if !cm.Enabled() { + return + } + + // stop collecting configuration + cm.logger.Info("Stopping fleet management service") + close(cm.done) + cm.wg.Wait() +} + +// CheckRawConfig check settings are correct to start the beat. This method +// checks there are no collision between the existing configuration and what +// fleet management can configure. +func (cm *Manager) CheckRawConfig(cfg *common.Config) error { + // TODO implement this method + return nil +} + +func (cm *Manager) worker() { + defer cm.wg.Done() + + // Start worker loop: fetch + apply new settings +WORKERLOOP: + for { + select { + case cfg := <-cm.configChan: + blocks, err := cm.toConfigBlocks(cfg) + if err != nil { + cm.logger.Errorf("Could not apply the configuration, error: %+v", err) + continue WORKERLOOP + } + + if errs := cm.apply(blocks); !errs.IsEmpty() { + cm.logger.Errorf("Could not apply the configuration, error: %+v", errs) + continue WORKERLOOP + } + case <-cm.done: + return + } + } +} + +func (cm *Manager) apply(blocks api.ConfigBlocks) xmanagement.Errors { + var errors xmanagement.Errors + missing := map[string]bool{} + for _, name := range cm.registry.GetRegisteredNames() { + missing[name] = true + } + + // Detect unwanted configs from the list + if errs := cm.blacklist.Detect(blocks); !errs.IsEmpty() { + errors = append(errors, errs...) + return errors + } + + // Reload configs + for _, b := range blocks { + if err := cm.reload(b.Type, b.Blocks); err != nil { + errors = append(errors, err) + } + missing[b.Type] = false + } + + // Unset missing configs + for name := range missing { + if missing[name] { + if err := cm.reload(name, []*api.ConfigBlock{}); err != nil { + errors = append(errors, err) + } + } + } + + return errors +} + +func (cm *Manager) reload(t string, blocks []*api.ConfigBlock) *xmanagement.Error { + cm.logger.Infof("Applying settings for %s", t) + if obj := cm.registry.GetReloadable(t); obj != nil { + // Single object + if len(blocks) > 1 { + err := fmt.Errorf("got an invalid number of configs for %s: %d, expected: 1", t, len(blocks)) + cm.logger.Error(err) + return xmanagement.NewConfigError(err) + } + + var config *reload.ConfigWithMeta + var err error + if len(blocks) == 1 { + config, err = blocks[0].ConfigWithMeta() + if err != nil { + cm.logger.Error(err) + return xmanagement.NewConfigError(err) + } + } + + if err := obj.Reload(config); err != nil { + cm.logger.Error(err) + return xmanagement.NewConfigError(err) + } + } else if obj := cm.registry.GetReloadableList(t); obj != nil { + // List + var configs []*reload.ConfigWithMeta + for _, block := range blocks { + config, err := block.ConfigWithMeta() + if err != nil { + cm.logger.Error(err) + return xmanagement.NewConfigError(err) + } + configs = append(configs, config) + } + + if err := obj.Reload(configs); err != nil { + cm.logger.Error(err) + return xmanagement.NewConfigError(err) + } + } + + return nil +} + +func (cm *Manager) toConfigBlocks(cfg common.MapStr) (api.ConfigBlocks, error) { + blocks := map[string][]*api.ConfigBlock{} + + // Extract all registered values beat can respond to + for _, regName := range cm.registry.GetRegisteredNames() { + iBlock, err := cfg.GetValue(regName) + if err != nil { + continue + } + + if mapBlock, ok := iBlock.(map[string]interface{}); ok { + blocks[regName] = append(blocks[regName], &api.ConfigBlock{Raw: mapBlock}) + } else if arrayBlock, ok := iBlock.([]interface{}); ok { + for _, item := range arrayBlock { + if mapBlock, ok := item.(map[string]interface{}); ok { + blocks[regName] = append(blocks[regName], &api.ConfigBlock{Raw: mapBlock}) + } + } + } + } + + // keep the ordering consistent while grouping the items. + keys := make([]string, 0, len(blocks)) + for k := range blocks { + keys = append(keys, k) + } + sort.Strings(keys) + + res := api.ConfigBlocks{} + for _, t := range keys { + b := blocks[t] + res = append(res, api.ConfigBlocksWithType{Type: t, Blocks: b}) + } + + return res, nil +} + +func (cm *Manager) startGrpcServer() { + cm.logger.Info("initiating fleet config manager") + s := NewConfigServer(cm.ConfigChan()) + if err := server.NewGrpcServer(os.Stdin, s); err != nil { + panic(err) + } +} + +var _ ConfigManager = &Manager{} diff --git a/x-pack/libbeat/management/fleet/manager_test.go b/x-pack/libbeat/management/fleet/manager_test.go new file mode 100644 index 000000000000..179497417b45 --- /dev/null +++ b/x-pack/libbeat/management/fleet/manager_test.go @@ -0,0 +1,60 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleet + +import ( + "testing" + + "github.com/elastic/beats/libbeat/common" + + "github.com/elastic/beats/libbeat/common/reload" +) + +func TestConfigBlocks(t *testing.T) { + input := ` +filebeat: + inputs: + - type: log + paths: + - /var/log/hello1.log + - /var/log/hello2.log +output: + elasticsearch: + hosts: + - localhost:9200` + + var cfg common.MapStr + uconfig, err := common.NewConfigFrom(input) + if err != nil { + t.Fatalf("Config blocks unsuccessfully generated: %+v", err) + } + + err = uconfig.Unpack(&cfg) + if err != nil { + t.Fatalf("Config blocks unsuccessfully generated: %+v", err) + } + + reg := reload.NewRegistry() + reg.Register("output", &dummyReloadable{}) + reg.Register("filebeat.inputs", &dummyReloadable{}) + + cm := &Manager{ + registry: reg, + } + blocks, err := cm.toConfigBlocks(cfg) + if err != nil { + t.Fatalf("Config blocks unsuccessfully generated: %+v", err) + } + + if len(blocks) != 2 { + t.Fatalf("Expected 2 block have %d: %+v", len(blocks), blocks) + } +} + +type dummyReloadable struct{} + +func (dummyReloadable) Reload(config *reload.ConfigWithMeta) error { + return nil +} diff --git a/x-pack/libbeat/management/fleet/plugin.go b/x-pack/libbeat/management/fleet/plugin.go new file mode 100644 index 000000000000..9a3ed7819765 --- /dev/null +++ b/x-pack/libbeat/management/fleet/plugin.go @@ -0,0 +1,32 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleet + +import ( + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/feature" + "github.com/elastic/beats/libbeat/management" + xmanagement "github.com/elastic/beats/x-pack/libbeat/management" +) + +func init() { + management.Register("x-pack-fleet", NewFleetManagerPlugin, feature.Beta) +} + +// NewFleetManagerPlugin creates a plugin function returning factory if configuration matches the criteria +func NewFleetManagerPlugin(config *common.Config) management.FactoryFunc { + c := defaultConfig() + if config.Enabled() { + if err := config.Unpack(&c); err != nil { + return nil + } + + if c.Mode == xmanagement.ModeFleet { + return NewFleetManager + } + } + + return nil +} diff --git a/x-pack/libbeat/management/manager.go b/x-pack/libbeat/management/manager.go index 6697440bbceb..2dd7c1a23a09 100644 --- a/x-pack/libbeat/management/manager.go +++ b/x-pack/libbeat/management/manager.go @@ -10,7 +10,6 @@ import ( "time" "github.com/elastic/beats/libbeat/common/reload" - "github.com/elastic/beats/libbeat/feature" "github.com/gofrs/uuid" @@ -26,10 +25,6 @@ import ( var errEmptyAccessToken = errors.New("access_token is empty, you must reenroll your Beat") -func init() { - management.Register("x-pack", NewConfigManager, feature.Beta) -} - // ConfigManager handles internal config updates. By retrieving // new configs from Kibana and applying them to the Beat type ConfigManager struct { @@ -284,7 +279,7 @@ func (cm *ConfigManager) reload(t string, blocks []*api.ConfigBlock) *Error { if len(blocks) > 1 { err := fmt.Errorf("got an invalid number of configs for %s: %d, expected: 1", t, len(blocks)) cm.logger.Error(err) - return newConfigError(err) + return NewConfigError(err) } var config *reload.ConfigWithMeta @@ -293,13 +288,13 @@ func (cm *ConfigManager) reload(t string, blocks []*api.ConfigBlock) *Error { config, err = blocks[0].ConfigWithMeta() if err != nil { cm.logger.Error(err) - return newConfigError(err) + return NewConfigError(err) } } if err := obj.Reload(config); err != nil { cm.logger.Error(err) - return newConfigError(err) + return NewConfigError(err) } } else if obj := cm.registry.GetReloadableList(t); obj != nil { // List @@ -308,14 +303,14 @@ func (cm *ConfigManager) reload(t string, blocks []*api.ConfigBlock) *Error { config, err := block.ConfigWithMeta() if err != nil { cm.logger.Error(err) - return newConfigError(err) + return NewConfigError(err) } configs = append(configs, config) } if err := obj.Reload(configs); err != nil { cm.logger.Error(err) - return newConfigError(err) + return NewConfigError(err) } } diff --git a/x-pack/libbeat/management/manager_test.go b/x-pack/libbeat/management/manager_test.go index 1ada01bfdb36..af8cf0f10ee0 100644 --- a/x-pack/libbeat/management/manager_test.go +++ b/x-pack/libbeat/management/manager_test.go @@ -73,6 +73,7 @@ func TestConfigManager(t *testing.T) { config := &Config{ Enabled: true, + Mode: ModeCentralManagement, Period: 100 * time.Millisecond, Kibana: c, AccessToken: accessToken, @@ -148,6 +149,7 @@ func TestRemoveItems(t *testing.T) { config := &Config{ Enabled: true, + Mode: ModeCentralManagement, Period: 100 * time.Millisecond, Kibana: c, AccessToken: accessToken, @@ -225,6 +227,7 @@ func TestUnEnroll(t *testing.T) { config := &Config{ Enabled: true, + Mode: ModeCentralManagement, Period: 100 * time.Millisecond, Kibana: c, AccessToken: accessToken, @@ -299,6 +302,7 @@ func TestBadConfig(t *testing.T) { config := &Config{ Enabled: true, + Mode: ModeCentralManagement, Period: 100 * time.Millisecond, Kibana: c, AccessToken: accessToken, diff --git a/x-pack/libbeat/management/plugin.go b/x-pack/libbeat/management/plugin.go new file mode 100644 index 000000000000..2df666ff1663 --- /dev/null +++ b/x-pack/libbeat/management/plugin.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/feature" + "github.com/elastic/beats/libbeat/management" +) + +func init() { + management.Register("x-pack", NewManagerPlugin, feature.Beta) +} + +// NewManagerPlugin creates a plugin function returning factory if configuration matches the criteria +func NewManagerPlugin(config *common.Config) management.FactoryFunc { + c := defaultConfig() + if config.Enabled() { + if err := config.Unpack(&c); err != nil { + return nil + } + + if c.Mode == ModeCentralManagement { + return NewConfigManager + } + } + + return nil +} diff --git a/x-pack/metricbeat/metricbeat.spec b/x-pack/metricbeat/metricbeat.spec new file mode 100644 index 000000000000..48c6b26332a7 --- /dev/null +++ b/x-pack/metricbeat/metricbeat.spec @@ -0,0 +1 @@ +{"BinaryPath":"metricbeat","Args":["-e"],"Configurable":"grpc"}