Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
498 changes: 249 additions & 249 deletions NOTICE.txt

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ require (
go.elastic.co/ecszap v1.0.2
go.elastic.co/go-licence-detector v0.7.0
go.opentelemetry.io/collector/component/componentstatus v0.119.0
go.opentelemetry.io/collector/pipeline v0.119.0
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.119.0
go.opentelemetry.io/collector/receiver/nopreceiver v0.119.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.32.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/net v0.34.0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.29.0
Expand Down Expand Up @@ -578,7 +580,6 @@ require (
go.opentelemetry.io/collector/pdata v1.25.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.119.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.119.0 // indirect
go.opentelemetry.io/collector/pipeline v0.119.0 // indirect
go.opentelemetry.io/collector/pipeline/xpipeline v0.119.0 // indirect
go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.119.0 // indirect
go.opentelemetry.io/collector/processor/processortest v0.119.0 // indirect
Expand Down Expand Up @@ -617,7 +618,6 @@ require (
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/oauth2 v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
Expand Down
138 changes: 119 additions & 19 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync/atomic"
"time"

"github.com/elastic/elastic-agent/internal/pkg/otel/configtranslate"

"go.opentelemetry.io/collector/component/componentstatus"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
Expand Down Expand Up @@ -217,6 +219,12 @@ type Coordinator struct {

otelMgr OTelManager
otelCfg *confmap.Conf
// the final config sent to the manager, contains both config from hybrid mode and from components
finalOtelCfg *confmap.Conf

// This variable controls whether we run supported components in the Otel manager instead of the runtime manager.
// It's a temporary measure until we decide exactly how we want to control where specific components run.
runComponentsInOtelManager bool

caps capabilities.Capabilities
modifiers []ComponentsModifier
Expand Down Expand Up @@ -384,21 +392,22 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
LogLevel: logLevel,
}
c := &Coordinator{
logger: logger,
cfg: cfg,
agentInfo: agentInfo,
isManaged: isManaged,
specs: specs,
reexecMgr: reexecMgr,
upgradeMgr: upgradeMgr,
monitorMgr: monitorMgr,
runtimeMgr: runtimeMgr,
configMgr: configMgr,
varsMgr: varsMgr,
otelMgr: otelMgr,
caps: caps,
modifiers: modifiers,
state: state,
logger: logger,
cfg: cfg,
agentInfo: agentInfo,
isManaged: isManaged,
specs: specs,
reexecMgr: reexecMgr,
upgradeMgr: upgradeMgr,
monitorMgr: monitorMgr,
runtimeMgr: runtimeMgr,
configMgr: configMgr,
varsMgr: varsMgr,
otelMgr: otelMgr,
runComponentsInOtelManager: false, // change this to run supported components in the Otel manager
caps: caps,
modifiers: modifiers,
state: state,
// Note: the uses of a buffered input channel in our broadcaster (the
// third parameter to broadcaster.New) means that it is possible for
// immediately adjacent writes/reads not to match, e.g.:
Expand Down Expand Up @@ -775,7 +784,7 @@ func (c *Coordinator) Run(ctx context.Context) error {
// information about the state of the Elastic Agent.
// Called by external goroutines.
func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
return diagnostics.Hooks{
hooks := diagnostics.Hooks{
{
Name: "agent-info",
Filename: "agent-info.yaml",
Expand Down Expand Up @@ -1016,6 +1025,26 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
},
},
}
if c.runComponentsInOtelManager {
otelComponentHook := diagnostics.Hook{
Name: "otel-final",
Filename: "otel-final.yaml",
Description: "Final otel configuration used by the Elastic Agent. Includes hybrid mode config and component config.",
ContentType: "application/yaml",
Hook: func(_ context.Context) []byte {
if c.finalOtelCfg == nil {
return []byte("no active OTel configuration")
}
o, err := yaml.Marshal(c.finalOtelCfg.ToStringMap())
if err != nil {
return []byte(fmt.Sprintf("error: failed to convert to yaml: %v", err))
}
return o
},
}
hooks = append(hooks, otelComponentHook)
}
return hooks
}

// runner performs the actual work of running all the managers.
Expand Down Expand Up @@ -1227,7 +1256,6 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (err error) {
if c.otelMgr != nil {
c.otelCfg = cfg.OTel
c.otelMgr.Update(cfg.OTel)
}
return c.processConfigAgent(ctx, cfg)
}
Expand Down Expand Up @@ -1413,17 +1441,89 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {
c.logger.Debugf("Continue with missing \"signed\" properties: %v", err)
}

model := component.Model{
model := &component.Model{
Components: c.componentModel,
Signed: signed,
}

c.logger.Info("Updating running component model")
c.logger.With("components", model.Components).Debug("Updating running component model")
c.runtimeMgr.Update(model)
return c.updateManagersWithConfig(model)
}

// updateManagersWithConfig updates runtime managers with the component model and config.
// Components may be sent to different runtimes depending on various criteria.
func (c *Coordinator) updateManagersWithConfig(model *component.Model) error {
runtimeModel, otelModel := c.splitModelBetweenManagers(model)
c.logger.With("components", runtimeModel.Components).Debug("Updating runtime manager model")
c.runtimeMgr.Update(*runtimeModel)
return c.updateOtelManagerConfig(otelModel)
}

// updateOtelManagerConfig updates the otel collector configuration for the otel manager. It assembles this configuration
// from the component model passed in and from the hybrid-mode otel config set on the Coordinator.
func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error {
finalOtelCfg := confmap.New()
var componentOtelCfg *confmap.Conf
if len(model.Components) > 0 {
var err error
c.logger.With("components", model.Components).Debug("Updating otel manager model")
componentOtelCfg, err = configtranslate.GetOtelConfig(model, c.agentInfo)
if err != nil {
c.logger.Errorf("failed to generate otel config: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, this error (perhaps any error in this chain) should change the overall agent status to failed or degraded. That will make these failures much more obvious and visible in Fleet.

}
}
if componentOtelCfg != nil {
err := finalOtelCfg.Merge(componentOtelCfg)
if err != nil {
c.logger.Error("failed to merge otel config: %v", err)
}
}

if c.otelCfg != nil {
err := finalOtelCfg.Merge(c.otelCfg)
if err != nil {
c.logger.Error("failed to merge otel config: %v", err)
}
}

if len(finalOtelCfg.AllKeys()) == 0 {
// if the config is empty, we want to send nil to the manager, so it knows to stop the collector
finalOtelCfg = nil
}

c.otelMgr.Update(finalOtelCfg)
c.finalOtelCfg = finalOtelCfg
return nil
}

// splitModelBetweenManager splits the model components between the runtime manager and the otel manager.
func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) {
if !c.runComponentsInOtelManager {
// Runtime manager gets all the components, this is the default
otelModel = &component.Model{}
runtimeModel = model
return
}
var otelComponents, runtimeComponents []component.Component
for _, comp := range model.Components {
if configtranslate.IsComponentOtelSupported(&comp) {
otelComponents = append(otelComponents, comp)
} else {
runtimeComponents = append(runtimeComponents, comp)
}
}
otelModel = &component.Model{
Components: otelComponents,
// the signed portion of the policy is only used by Defend, so otel doesn't need it for anything
}
runtimeModel = &component.Model{
Components: runtimeComponents,
Signed: model.Signed,
}
return
}

// generateComponentModel regenerates the configuration tree and
// components from the current AST and vars and returns the result.
// Called from both the main Coordinator goroutine and from external
Expand Down
Loading