Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
424 changes: 212 additions & 212 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ require (
go.elastic.co/ecszap v1.0.2
go.elastic.co/go-licence-detector v0.7.0
go.opentelemetry.io/collector/component/componentstatus v0.120.0
go.opentelemetry.io/collector/pipeline v0.120.0
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.119.0
go.opentelemetry.io/collector/receiver/nopreceiver v0.119.0
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -590,7 +591,6 @@ require (
go.opentelemetry.io/collector/pdata v1.26.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.120.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.120.0 // indirect
go.opentelemetry.io/collector/pipeline v0.120.0 // indirect
go.opentelemetry.io/collector/pipeline/xpipeline v0.120.0 // indirect
go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0 // indirect
go.opentelemetry.io/collector/processor/processortest v0.120.0 // indirect
Expand Down
138 changes: 119 additions & 19 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync/atomic"
"time"

"github.com/elastic/elastic-agent/internal/pkg/otel/configtranslate"

"go.opentelemetry.io/collector/component/componentstatus"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
Expand Down Expand Up @@ -221,6 +223,12 @@ type Coordinator struct {

otelMgr OTelManager
otelCfg *confmap.Conf
// the final config sent to the manager, contains both config from hybrid mode and from components
finalOtelCfg *confmap.Conf

// This variable controls whether we run supported components in the Otel manager instead of the runtime manager.
// It's a temporary measure until we decide exactly how we want to control where specific components run.
runComponentsInOtelManager bool

caps capabilities.Capabilities
modifiers []ComponentsModifier
Expand Down Expand Up @@ -388,21 +396,22 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
LogLevel: logLevel,
}
c := &Coordinator{
logger: logger,
cfg: cfg,
agentInfo: agentInfo,
isManaged: isManaged,
specs: specs,
reexecMgr: reexecMgr,
upgradeMgr: upgradeMgr,
monitorMgr: monitorMgr,
runtimeMgr: runtimeMgr,
configMgr: configMgr,
varsMgr: varsMgr,
otelMgr: otelMgr,
caps: caps,
modifiers: modifiers,
state: state,
logger: logger,
cfg: cfg,
agentInfo: agentInfo,
isManaged: isManaged,
specs: specs,
reexecMgr: reexecMgr,
upgradeMgr: upgradeMgr,
monitorMgr: monitorMgr,
runtimeMgr: runtimeMgr,
configMgr: configMgr,
varsMgr: varsMgr,
otelMgr: otelMgr,
runComponentsInOtelManager: false, // change this to run supported components in the Otel manager
caps: caps,
modifiers: modifiers,
state: state,
// Note: the uses of a buffered input channel in our broadcaster (the
// third parameter to broadcaster.New) means that it is possible for
// immediately adjacent writes/reads not to match, e.g.:
Expand Down Expand Up @@ -781,7 +790,7 @@ func (c *Coordinator) Run(ctx context.Context) error {
// information about the state of the Elastic Agent.
// Called by external goroutines.
func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
return diagnostics.Hooks{
hooks := diagnostics.Hooks{
{
Name: "agent-info",
Filename: "agent-info.yaml",
Expand Down Expand Up @@ -1023,6 +1032,26 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
},
},
}
if c.runComponentsInOtelManager {
otelComponentHook := diagnostics.Hook{
Name: "otel-final",
Filename: "otel-final.yaml",
Description: "Final otel configuration used by the Elastic Agent. Includes hybrid mode config and component config.",
ContentType: "application/yaml",
Hook: func(_ context.Context) []byte {
if c.finalOtelCfg == nil {
return []byte("no active OTel configuration")
}
o, err := yaml.Marshal(c.finalOtelCfg.ToStringMap())
if err != nil {
return []byte(fmt.Sprintf("error: failed to convert to yaml: %v", err))
}
return o
},
}
hooks = append(hooks, otelComponentHook)
}
return hooks
}

// runner performs the actual work of running all the managers.
Expand Down Expand Up @@ -1234,7 +1263,6 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (err error) {
if c.otelMgr != nil {
c.otelCfg = cfg.OTel
c.otelMgr.Update(cfg.OTel)
}
return c.processConfigAgent(ctx, cfg)
}
Expand Down Expand Up @@ -1420,17 +1448,89 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {
c.logger.Debugf("Continue with missing \"signed\" properties: %v", err)
}

model := component.Model{
model := &component.Model{
Components: c.componentModel,
Signed: signed,
}

c.logger.Info("Updating running component model")
c.logger.With("components", model.Components).Debug("Updating running component model")
c.runtimeMgr.Update(model)
return c.updateManagersWithConfig(model)
}

// updateManagersWithConfig updates runtime managers with the component model and config.
// Components may be sent to different runtimes depending on various criteria.
func (c *Coordinator) updateManagersWithConfig(model *component.Model) error {
runtimeModel, otelModel := c.splitModelBetweenManagers(model)
c.logger.With("components", runtimeModel.Components).Debug("Updating runtime manager model")
c.runtimeMgr.Update(*runtimeModel)
return c.updateOtelManagerConfig(otelModel)
}

// updateOtelManagerConfig updates the otel collector configuration for the otel manager. It assembles this configuration
// from the component model passed in and from the hybrid-mode otel config set on the Coordinator.
func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error {
finalOtelCfg := confmap.New()
var componentOtelCfg *confmap.Conf
if len(model.Components) > 0 {
var err error
c.logger.With("components", model.Components).Debug("Updating otel manager model")
componentOtelCfg, err = configtranslate.GetOtelConfig(model, c.agentInfo)
if err != nil {
c.logger.Errorf("failed to generate otel config: %v", err)
}
}
if componentOtelCfg != nil {
err := finalOtelCfg.Merge(componentOtelCfg)
if err != nil {
c.logger.Error("failed to merge otel config: %v", err)
}
}

if c.otelCfg != nil {
err := finalOtelCfg.Merge(c.otelCfg)
if err != nil {
c.logger.Error("failed to merge otel config: %v", err)
}
}

if len(finalOtelCfg.AllKeys()) == 0 {
// if the config is empty, we want to send nil to the manager, so it knows to stop the collector
finalOtelCfg = nil
}

c.otelMgr.Update(finalOtelCfg)
c.finalOtelCfg = finalOtelCfg
return nil
}

// splitModelBetweenManager splits the model components between the runtime manager and the otel manager.
func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) {
if !c.runComponentsInOtelManager {
// Runtime manager gets all the components, this is the default
otelModel = &component.Model{}
runtimeModel = model
return
}
var otelComponents, runtimeComponents []component.Component
for _, comp := range model.Components {
if configtranslate.IsComponentOtelSupported(&comp) {
otelComponents = append(otelComponents, comp)
} else {
runtimeComponents = append(runtimeComponents, comp)
}
}
otelModel = &component.Model{
Components: otelComponents,
// the signed portion of the policy is only used by Defend, so otel doesn't need it for anything
}
runtimeModel = &component.Model{
Components: runtimeComponents,
Signed: model.Signed,
}
return
}

// generateComponentModel regenerates the configuration tree and
// components from the current AST and vars and returns the result.
// Called from both the main Coordinator goroutine and from external
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) {
upgradeMgr: upgradeMgr,
// Add a placeholder runtime manager that will accept any updates
runtimeMgr: &fakeRuntimeManager{},
otelMgr: &fakeOTelManager{},

// Set valid but empty initial values for ast and vars
vars: emptyVars(t),
Expand Down Expand Up @@ -583,6 +584,7 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) {
},
// Add a placeholder runtime manager that will accept any updates
runtimeMgr: &fakeRuntimeManager{},
otelMgr: &fakeOTelManager{},

// Set valid but empty initial values for ast and vars
vars: emptyVars(t),
Expand Down Expand Up @@ -681,6 +683,7 @@ func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) {
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
otelMgr: &fakeOTelManager{},
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}
Expand Down Expand Up @@ -921,6 +924,141 @@ service:
assert.Nil(t, otelConfig, "empty policy should cause otel manager to get nil config")
}

func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it generates the right component model and sends components
// to both the runtime manager and the otel manager.

// Set a one-second timeout -- nothing here should block, but if it
// does let's report a failure instead of timing out the test runner.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
logger := logp.NewLogger("testing")

configChan := make(chan ConfigChange, 1)

// Create a mocked runtime manager that will report the update call
var updated bool // Set by runtime manager callback
var components []component.Component // Set by runtime manager callback
runtimeManager := &fakeRuntimeManager{
updateCallback: func(comp []component.Component) error {
updated = true
components = comp
return nil
},
}
var otelUpdated bool // Set by otel manager callback
var otelConfig *confmap.Conf // Set by otel manager callback
otelManager := &fakeOTelManager{
updateCallback: func(cfg *confmap.Conf) error {
otelUpdated = true
otelConfig = cfg
return nil
},
}

// we need the filestream spec to be able to convert to Otel config
componentSpec := component.InputRuntimeSpec{
InputType: "filestream",
BinaryName: "agentbeat",
Spec: component.InputSpec{
Name: "filestream",
Command: &component.CommandSpec{
Args: []string{"filebeat"},
},
Platforms: []string{
"linux/amd64",
"linux/arm64",
"darwin/amd64",
"darwin/arm64",
"windows/amd64",
"container/amd64",
"container/arm64",
},
},
}

platform, err := component.LoadPlatformDetail()
require.NoError(t, err)
specs, err := component.NewRuntimeSpecs(platform, []component.InputRuntimeSpec{componentSpec})
require.NoError(t, err)

coord := &Coordinator{
logger: logger,
agentInfo: &info.AgentInfo{},
stateBroadcaster: broadcaster.New(State{}, 0, 0),
managerChans: managerChans{
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
otelMgr: otelManager,
runComponentsInOtelManager: true,
specs: specs,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Create a policy with one input and one output (no otel configuration)
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
hosts:
- localhost:9200
inputs:
- id: test-input
type: filestream
use_output: default
- id: test-other-input
type: system/metrics
use_output: default
receivers:
nop:
exporters:
nop:
service:
pipelines:
traces:
receivers:
- nop
exporters:
- nop
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// Make sure the runtime manager received the expected component update.
// An assert.Equal on the full component model doesn't play nice with
// the embedded proto structs, so instead we verify the important fields
// manually (sorry).
assert.True(t, updated, "Runtime manager should be updated after a policy change")
require.Equal(t, 1, len(components), "Test policy should generate one component")
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
require.NotNil(t, otelConfig, "OTel manager should have config")

runtimeComponent := components[0]
assert.Equal(t, "system/metrics-default", runtimeComponent.ID)
require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error")
assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'")
require.Equal(t, 2, len(runtimeComponent.Units))

units := runtimeComponent.Units
// Verify the input unit
assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID)
assert.Equal(t, client.UnitTypeInput, units[0].Type)
assert.Equal(t, "test-other-input", units[0].Config.Id)
assert.Equal(t, "system/metrics", units[0].Config.Type)

// Verify the output unit
assert.Equal(t, "system/metrics-default", units[1].ID)
assert.Equal(t, client.UnitTypeOutput, units[1].Type)
assert.Equal(t, "elasticsearch", units[1].Config.Type)
}

func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
// Set a one-second timeout -- nothing here should block, but if it
// does let's report a failure instead of timing out the test runner.
Expand Down Expand Up @@ -950,7 +1088,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
// manager, so it receives the update result.
runtimeManagerError: updateErrChan,
},
runtimeMgr: runtimeManager,
runtimeMgr: runtimeManager,
otelMgr: &fakeOTelManager{},

vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}
Expand Down Expand Up @@ -1075,6 +1215,7 @@ func TestCoordinatorAppliesVarsToPolicy(t *testing.T) {
varsManagerUpdate: varsChan,
},
runtimeMgr: runtimeManager,
otelMgr: &fakeOTelManager{},
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}
Expand Down
Loading
Loading