Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 28 additions & 33 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,6 @@ type Coordinator struct {
// the final config sent to the manager, contains both config from hybrid mode and from components
finalOtelCfg *confmap.Conf

// This variable controls whether we run supported components in the Otel manager instead of the runtime manager.
// It's a temporary measure until we decide exactly how we want to control where specific components run.
runComponentsInOtelManager bool

caps capabilities.Capabilities
modifiers []ComponentsModifier

Expand Down Expand Up @@ -395,22 +391,21 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
LogLevel: logLevel,
}
c := &Coordinator{
logger: logger,
cfg: cfg,
agentInfo: agentInfo,
isManaged: isManaged,
specs: specs,
reexecMgr: reexecMgr,
upgradeMgr: upgradeMgr,
monitorMgr: monitorMgr,
runtimeMgr: runtimeMgr,
configMgr: configMgr,
varsMgr: varsMgr,
otelMgr: otelMgr,
runComponentsInOtelManager: false, // change this to run supported components in the Otel manager
caps: caps,
modifiers: modifiers,
state: state,
logger: logger,
cfg: cfg,
agentInfo: agentInfo,
isManaged: isManaged,
specs: specs,
reexecMgr: reexecMgr,
upgradeMgr: upgradeMgr,
monitorMgr: monitorMgr,
runtimeMgr: runtimeMgr,
configMgr: configMgr,
varsMgr: varsMgr,
otelMgr: otelMgr,
caps: caps,
modifiers: modifiers,
state: state,
// Note: the uses of a buffered input channel in our broadcaster (the
// third parameter to broadcaster.New) means that it is possible for
// immediately adjacent writes/reads not to match, e.g.:
Expand Down Expand Up @@ -1030,9 +1025,7 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
return o
},
},
}
if c.runComponentsInOtelManager {
otelComponentHook := diagnostics.Hook{
diagnostics.Hook{
Name: "otel-final",
Filename: "otel-final.yaml",
Description: "Final otel configuration used by the Elastic Agent. Includes hybrid mode config and component config.",
Expand All @@ -1047,8 +1040,7 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
}
return o
},
}
hooks = append(hooks, otelComponentHook)
},
}
return hooks
}
Expand Down Expand Up @@ -1478,6 +1470,11 @@ func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error {
if err != nil {
c.logger.Errorf("failed to generate otel config: %v", err)
}
componentIDs := make([]string, 0, len(model.Components))
for _, comp := range model.Components {
componentIDs = append(componentIDs, comp.ID)
}
c.logger.With("component_ids", componentIDs).Warn("The Otel runtime manager is HIGHLY EXPERIMENTAL and only intended for testing. Use at your own risk.")
}
if componentOtelCfg != nil {
err := finalOtelCfg.Merge(componentOtelCfg)
Expand Down Expand Up @@ -1505,18 +1502,16 @@ func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error {

// splitModelBetweenManager splits the model components between the runtime manager and the otel manager.
func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) {
if !c.runComponentsInOtelManager {
// Runtime manager gets all the components, this is the default
otelModel = &component.Model{}
runtimeModel = model
return
}
var otelComponents, runtimeComponents []component.Component
for _, comp := range model.Components {
if configtranslate.IsComponentOtelSupported(&comp) {
switch comp.RuntimeManager {
case component.OtelRuntimeManager:
otelComponents = append(otelComponents, comp)
} else {
case component.ProcessRuntimeManager:
runtimeComponents = append(runtimeComponents, comp)
default:
// this should be impossible if we parse the configuration correctly
c.logger.Errorf("unknown runtime manager for component: %s, ignoring", comp.RuntimeManager)
}
}
otelModel = &component.Model{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -990,12 +990,11 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t
managerChans: managerChans{
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
otelMgr: otelManager,
runComponentsInOtelManager: true,
specs: specs,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
runtimeMgr: runtimeManager,
otelMgr: otelManager,
specs: specs,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Create a policy with one input and one output (no otel configuration)
Expand All @@ -1009,6 +1008,7 @@ inputs:
- id: test-input
type: filestream
use_output: default
_runtime_experimental: otel
- id: test-other-input
type: system/metrics
use_output: default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestCoordinatorExpectedDiagnosticHooks(t *testing.T) {
"components-actual",
"state",
"otel",
"otel-final",
}

coord := &Coordinator{}
Expand Down
126 changes: 83 additions & 43 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"encoding/json"
"errors"
"fmt"
"maps"
"slices"
"sort"
"strings"

Expand All @@ -30,11 +32,16 @@ type HeadersProvider interface {
Headers() map[string]string
}

type RuntimeManager string

const (
// defaultUnitLogLevel is the default log level that a unit will get if one is not defined.
defaultUnitLogLevel = client.UnitLogLevelInfo
headersKey = "headers"
elasticsearchType = "elasticsearch"
defaultUnitLogLevel = client.UnitLogLevelInfo
headersKey = "headers"
elasticsearchType = "elasticsearch"
ProcessRuntimeManager = RuntimeManager("process")
OtelRuntimeManager = RuntimeManager("otel")
DefaultRuntimeManager RuntimeManager = ProcessRuntimeManager
)

// ErrInputRuntimeCheckFail error is used when an input specification runtime prevention check occurs.
Expand Down Expand Up @@ -157,6 +164,8 @@ type Component struct {
// The logical output type, i.e. the type of output that was requested.
OutputType string `yaml:"output_type"`

RuntimeManager RuntimeManager `yaml:"-"`

// Units that should be running inside this component.
Units []Unit `yaml:"units"`

Expand Down Expand Up @@ -354,27 +363,37 @@ func (r *RuntimeSpecs) componentsForInputType(
componentErr = ErrOutputNotSupported
}

var units []Unit
unitsForRuntimeManager := make(map[RuntimeManager][]Unit)
for _, input := range output.inputs[inputType] {
if input.enabled {
unitID := fmt.Sprintf("%s-%s", componentID, input.id)
units = append(units, unitForInput(input, unitID))
unitsForRuntimeManager[input.runtimeManager] = append(
unitsForRuntimeManager[input.runtimeManager],
unitForInput(input, unitID),
)
}
}

if len(units) > 0 {
// Populate the output units for this component
units = append(units, unitForOutput(output, componentID))
components = append(components, Component{
ID: componentID,
Err: componentErr,
InputSpec: &inputSpec,
InputType: inputType,
OutputType: output.outputType,
Units: units,
Features: featureFlags.AsProto(),
Component: componentConfig.AsProto(),
})
// sort to ensure consistent order
runtimeManagers := slices.Collect(maps.Keys(unitsForRuntimeManager))
slices.Sort(runtimeManagers)
for _, runtimeManager := range runtimeManagers {
units := unitsForRuntimeManager[runtimeManager]
if len(units) > 0 {
// Populate the output units for this component
units = append(units, unitForOutput(output, componentID))
components = append(components, Component{
ID: componentID,
Err: componentErr,
InputSpec: &inputSpec,
InputType: inputType,
OutputType: output.outputType,
Units: units,
RuntimeManager: runtimeManager,
Features: featureFlags.AsProto(),
Component: componentConfig.AsProto(),
})
}
}
} else {
for _, input := range output.inputs[inputType] {
Expand All @@ -393,14 +412,15 @@ func (r *RuntimeSpecs) componentsForInputType(
// each component gets its own output, because of unit isolation
units = append(units, unitForOutput(output, componentID))
components = append(components, Component{
ID: componentID,
Err: componentErr,
InputSpec: &inputSpec,
InputType: inputType,
OutputType: output.outputType,
Units: units,
Features: featureFlags.AsProto(),
Component: componentConfig.AsProto(),
ID: componentID,
Err: componentErr,
InputSpec: &inputSpec,
InputType: inputType,
OutputType: output.outputType,
Units: units,
RuntimeManager: input.runtimeManager,
Features: featureFlags.AsProto(),
Component: componentConfig.AsProto(),
})
}
}
Expand Down Expand Up @@ -508,12 +528,13 @@ func injectInputPolicyID(fleetPolicy map[string]interface{}, inputConfig map[str
// of components.
func toIntermediate(policy map[string]interface{}, aliasMapping map[string]string, ll logp.Level, headers HeadersProvider) (map[string]outputI, error) {
const (
outputsKey = "outputs"
enabledKey = "enabled"
inputsKey = "inputs"
typeKey = "type"
idKey = "id"
useOutputKey = "use_output"
outputsKey = "outputs"
enabledKey = "enabled"
inputsKey = "inputs"
typeKey = "type"
idKey = "id"
useOutputKey = "use_output"
runtimeManagerKey = "_runtime_experimental"
)

// intermediate structure for output to input mapping (this structure allows different input types per output)
Expand Down Expand Up @@ -654,17 +675,35 @@ func toIntermediate(policy map[string]interface{}, aliasMapping map[string]strin
return nil, fmt.Errorf("invalid 'inputs.%d.log_level', %w", idx, err)
}

runtimeManager := DefaultRuntimeManager
// determine the runtime manager for the input
if runtimeManagerRaw, ok := input[runtimeManagerKey]; ok {
runtimeManagerStr, ok := runtimeManagerRaw.(string)
if !ok {
return nil, fmt.Errorf("invalid 'inputs.%d.runtime', expected a string, not a %T", idx, runtimeManagerRaw)
}
runtimeManagerVal := RuntimeManager(runtimeManagerStr)
switch runtimeManagerVal {
case OtelRuntimeManager, ProcessRuntimeManager:
default:
return nil, fmt.Errorf("invalid 'inputs.%d.runtime', valid values are: %s, %s", idx, OtelRuntimeManager, ProcessRuntimeManager)
}
runtimeManager = runtimeManagerVal
delete(input, runtimeManagerKey)
}

// Inject the top level fleet policy revision into each input configuration. This
// allows individual inputs (like endpoint) to detect policy changes more easily.
injectInputPolicyID(policy, input)

output.inputs[t] = append(output.inputs[t], inputI{
idx: idx,
id: id,
enabled: enabled,
logLevel: logLevel,
inputType: t,
config: input,
idx: idx,
id: id,
enabled: enabled,
logLevel: logLevel,
inputType: t,
config: input,
runtimeManager: runtimeManager,
})
}
if len(outputsMap) == 0 {
Expand All @@ -674,11 +713,12 @@ func toIntermediate(policy map[string]interface{}, aliasMapping map[string]strin
}

type inputI struct {
idx int
id string
enabled bool
logLevel client.UnitLogLevel
inputType string // canonical (non-alias) type
idx int
id string
enabled bool
logLevel client.UnitLogLevel
inputType string // canonical (non-alias) type
runtimeManager RuntimeManager

// The raw configuration for this input, with small cleanups:
// - the "enabled", "use_output", and "log_level" keys are removed
Expand Down
Loading