Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix field names with `add_network_direction` processor. {issue}29747[29747] {pull}29751[29751]
- Fix the ability for subcommands to be ran properly from the beats containers. {pull}30452[30452]
- Log errors when parsing and applying config blocks and if the input is disabled. {pull}30534[30534]
- Ensure that the Reloadable part of beats are initialized before the Manager is started. {issue}30533[30533]
- Fixes Beats crashing when glibc >= 2.35 is used {issue}30576[30576]
- Fix dissect trim panics from DELETE (127)(\u007f) character {issue}30657[30657] {pull}30658[30658]

Expand Down
8 changes: 8 additions & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
adiscover.Start()

// We start the manager when all the subsystem are initialized and ready to received events.
if err := b.Manager.Start(); err != nil {
return err
}

// Add done channel to wait for shutdown signal
waitFinished.AddChan(fb.done)
waitFinished.Wait()
Expand Down Expand Up @@ -537,6 +542,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
}

// Stop the manager and stop the connection to any dependent services.
b.Manager.Stop()

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
return err
}
}
// Configure the beats Manager to start after all the reloadable hooks are initialized
// and shutdown when the function return.
if err := b.Manager.Start(); err != nil {
return err
}
defer b.Manager.Stop()

if bt.config.Autodiscover != nil {
bt.autodiscover, err = bt.makeAutodiscover(b)
Expand Down
5 changes: 2 additions & 3 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,8 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {

logp.Info("%s start running.", b.Info.Beat)

// Launch config manager
b.Manager.Start(beater.Stop)
defer b.Manager.Stop()
// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(beater.Stop)

return beater.Run(&b.Beat)
}
Expand Down
40 changes: 29 additions & 11 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,38 @@ type Manager interface {
// Enabled returns true if manager is enabled.
Enabled() bool

// Start the config manager giving it a stopFunc callback
// so the beat can be told when to stop.
Start(stopFunc func())

// Stop the config manager.
// Start needs to invoked when the system is ready to receive an external configuration and
// also ready to start ingesting new events. The manager expects that all the reloadable and
// reloadable list are fixed for the whole lifetime of the manager.
//
// Notes: Adding dynamically new reloadable hooks at runtime can lead to inconsistency in the
// execution.
Start() error

// Stop when this method is called, the manager will stop receiving new actions, no more action
// will be propagated to the handlers and will not try to configure any reloadable parts.
// When the manager is stop the callback will be called to signal that the system can terminate.
//
// Calls to 'CheckRawConfig()' or 'SetPayload()' will be ignored after calling stop.
//
// Note: Stop will not call 'UnregisterAction()' automaticallty.
Stop()

// SetStopCallback accepts a function that need to be called when the manager want to shutdown the
// beats. This is needed when you want your beats to be gracefully shutdown remotely by the Elastic Agent
// when a policy doesn't need to run this beat.
SetStopCallback(f func())

// CheckRawConfig check settings are correct before launching the beat.
CheckRawConfig(cfg *common.Config) error

// RegisterAction registers action handler with the client
RegisterAction(action client.Action)

// UnregisterAction unregisters action handler with the client
UnregisterAction(action client.Action)

// SetPayload sets the client payload
// SetPayload Allows to add additional metadata to future requests made by the manager.
SetPayload(map[string]interface{})
}

Expand Down Expand Up @@ -136,10 +152,11 @@ func defaultModeConfig() *modeConfig {

// nilManager, fallback when no manager is present
type nilManager struct {
logger *logp.Logger
lock sync.Mutex
status Status
msg string
logger *logp.Logger
lock sync.Mutex
status Status
msg string
stopFunc func()
}

func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) {
Expand All @@ -151,8 +168,9 @@ func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) {
}, nil
}

func (*nilManager) SetStopCallback(func()) {}
func (*nilManager) Enabled() bool { return false }
func (*nilManager) Start(_ func()) {}
func (*nilManager) Start() error { return nil }
func (*nilManager) Stop() {}
func (*nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
func (n *nilManager) UpdateStatus(status Status, msg string) {
Expand Down
8 changes: 8 additions & 0 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
modules.Stop()
}()

// Start the manager after all the reload hooks are configured,
// the Manager is stopped at the end of the execution.
if err := b.Manager.Start(); err != nil {
return err
}
defer b.Manager.Stop()

// Dynamic file based modules (metricbeat.config.modules)
if bt.config.ConfigModules.Enabled() {
moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules)
Expand Down Expand Up @@ -256,6 +263,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
}

wg.Wait()

return nil
}

Expand Down
13 changes: 11 additions & 2 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,19 @@ func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error {
func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error {
runner := newReloader(management.DebugK, factory, b.Publisher)
reload.Register.MustRegisterList("inputs", runner)
defer runner.Stop()

logp.Debug("main", "Waiting for the runner to finish")

// Start the manager after all the hooks are registered and terminates when
// the function return.
if err := b.Manager.Start(); err != nil {
return err
}

defer func() {
runner.Stop()
b.Manager.Stop()
}()

for {
select {
case <-pb.done:
Expand Down
30 changes: 22 additions & 8 deletions x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,27 +96,36 @@ func (cm *Manager) Enabled() bool {
return cm.config.Enabled
}

// Start the config manager
func (cm *Manager) Start(stopFunc func()) {
if !cm.Enabled() {
return
}
// SetStopCallback sets the callback to run when the manager want to shutdown the beats gracefully.
func (cm *Manager) SetStopCallback(stopFunc func()) {
cm.lock.Lock()
defer cm.lock.Unlock()
cm.stopFunc = stopFunc
}

// Start the config manager.
func (cm *Manager) Start() error {
cm.lock.Lock()
defer cm.lock.Unlock()

if !cm.Enabled() {
return nil
}

cfgwarn.Beta("Fleet management is enabled")
cm.logger.Info("Starting fleet management service")

cm.stopFunc = stopFunc
cm.isRunning = true
err := cm.client.Start(context.Background())
if err != nil {
cm.logger.Errorf("failed to start elastic-agent-client: %s", err)
return err
}
cm.logger.Info("Ready to receive configuration")
return nil
}

// Stop the config manager
// Stop stops the current Manager and close the connection to Elastic Agent.
func (cm *Manager) Stop() {
cm.lock.Lock()
defer cm.lock.Unlock()
Expand All @@ -133,6 +142,8 @@ func (cm *Manager) Stop() {
// CheckRawConfig check settings are correct to start the beat. This method
// checks there are no collision between the existing configuration and what
// fleet management can configure.
//
// NOTE: This is currently not implemented for fleet.
func (cm *Manager) CheckRawConfig(cfg *common.Config) error {
// TODO implement this method
return nil
Expand Down Expand Up @@ -217,6 +228,9 @@ func (cm *Manager) SetPayload(payload map[string]interface{}) {
}

func (cm *Manager) OnStop() {
cm.lock.Lock()
defer cm.lock.Unlock()

if cm.stopFunc != nil {
cm.client.Status(proto.StateObserved_STOPPING, "Stopping", nil)
cm.stopFunc()
Expand Down Expand Up @@ -320,7 +334,7 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (ConfigBlocks, error) {
for _, regName := range cm.registry.GetRegisteredNames() {
iBlock, err := cfg.GetValue(regName)
if err != nil {
cm.logger.Errorf("failed to get '%s' from config: %v. Continuing to next one", regName, err)
cm.logger.Warnf("failed to get '%s' from config: %v. Continuing to next one", regName, err)
continue
}

Expand Down
7 changes: 7 additions & 0 deletions x-pack/osquerybeat/beater/osquerybeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ func (bt *osquerybeat) Run(b *beat.Beat) error {
runner.Update(ctx, bt.config.Inputs)
}

// Ensure that all the hooks and actions are ready before starting the Manager
// to receive configuration.
if err := b.Manager.Start(); err != nil {
return err
}
defer b.Manager.Stop()

// Set the osquery beat version to the manager payload. This allows the bundled osquery version to be reported to the stack.
bt.setManagerPayload(b)

Expand Down