diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 410f22f1e47..88c900b6f64 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -222,10 +222,6 @@ type Coordinator struct { // the final config sent to the manager, contains both config from hybrid mode and from components finalOtelCfg *confmap.Conf - // This variable controls whether we run supported components in the Otel manager instead of the runtime manager. - // It's a temporary measure until we decide exactly how we want to control where specific components run. - runComponentsInOtelManager bool - caps capabilities.Capabilities modifiers []ComponentsModifier @@ -392,22 +388,21 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp. LogLevel: logLevel, } c := &Coordinator{ - logger: logger, - cfg: cfg, - agentInfo: agentInfo, - isManaged: isManaged, - specs: specs, - reexecMgr: reexecMgr, - upgradeMgr: upgradeMgr, - monitorMgr: monitorMgr, - runtimeMgr: runtimeMgr, - configMgr: configMgr, - varsMgr: varsMgr, - otelMgr: otelMgr, - runComponentsInOtelManager: false, // change this to run supported components in the Otel manager - caps: caps, - modifiers: modifiers, - state: state, + logger: logger, + cfg: cfg, + agentInfo: agentInfo, + isManaged: isManaged, + specs: specs, + reexecMgr: reexecMgr, + upgradeMgr: upgradeMgr, + monitorMgr: monitorMgr, + runtimeMgr: runtimeMgr, + configMgr: configMgr, + varsMgr: varsMgr, + otelMgr: otelMgr, + caps: caps, + modifiers: modifiers, + state: state, // Note: the uses of a buffered input channel in our broadcaster (the // third parameter to broadcaster.New) means that it is possible for // immediately adjacent writes/reads not to match, e.g.: @@ -1025,9 +1020,7 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks { return o }, }, - } - if c.runComponentsInOtelManager { - otelComponentHook := diagnostics.Hook{ + diagnostics.Hook{ Name: "otel-final", Filename: "otel-final.yaml", Description: "Final otel configuration used by the Elastic Agent. Includes hybrid mode config and component config.", @@ -1042,8 +1035,7 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks { } return o }, - } - hooks = append(hooks, otelComponentHook) + }, } return hooks } @@ -1473,6 +1465,11 @@ func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error { if err != nil { c.logger.Errorf("failed to generate otel config: %v", err) } + componentIDs := make([]string, 0, len(model.Components)) + for _, comp := range model.Components { + componentIDs = append(componentIDs, comp.ID) + } + c.logger.With("component_ids", componentIDs).Warn("The Otel runtime manager is HIGHLY EXPERIMENTAL and only intended for testing. Use at your own risk.") } if componentOtelCfg != nil { err := finalOtelCfg.Merge(componentOtelCfg) @@ -1500,18 +1497,16 @@ func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error { // splitModelBetweenManager splits the model components between the runtime manager and the otel manager. func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) { - if !c.runComponentsInOtelManager { - // Runtime manager gets all the components, this is the default - otelModel = &component.Model{} - runtimeModel = model - return - } var otelComponents, runtimeComponents []component.Component for _, comp := range model.Components { - if configtranslate.IsComponentOtelSupported(&comp) { + switch comp.RuntimeManager { + case component.OtelRuntimeManager: otelComponents = append(otelComponents, comp) - } else { + case component.ProcessRuntimeManager: runtimeComponents = append(runtimeComponents, comp) + default: + // this should be impossible if we parse the configuration correctly + c.logger.Errorf("unknown runtime manager for component: %s, ignoring", comp.RuntimeManager) } } otelModel = &component.Model{ diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index b0697948d29..40f05586bb6 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -990,12 +990,11 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t managerChans: managerChans{ configManagerUpdate: configChan, }, - runtimeMgr: runtimeManager, - otelMgr: otelManager, - runComponentsInOtelManager: true, - specs: specs, - vars: emptyVars(t), - componentPIDTicker: time.NewTicker(time.Second * 30), + runtimeMgr: runtimeManager, + otelMgr: otelManager, + specs: specs, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), } // Create a policy with one input and one output (no otel configuration) @@ -1009,6 +1008,7 @@ inputs: - id: test-input type: filestream use_output: default + _runtime_experimental: otel - id: test-other-input type: system/metrics use_output: default diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 19c4c2e2d97..93bfb340b23 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -44,6 +44,7 @@ func TestCoordinatorExpectedDiagnosticHooks(t *testing.T) { "components-actual", "state", "otel", + "otel-final", } coord := &Coordinator{} diff --git a/pkg/component/component.go b/pkg/component/component.go index bd1c4893171..91c844427d8 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -8,6 +8,8 @@ import ( "encoding/json" "errors" "fmt" + "maps" + "slices" "sort" "strings" @@ -30,11 +32,16 @@ type HeadersProvider interface { Headers() map[string]string } +type RuntimeManager string + const ( // defaultUnitLogLevel is the default log level that a unit will get if one is not defined. - defaultUnitLogLevel = client.UnitLogLevelInfo - headersKey = "headers" - elasticsearchType = "elasticsearch" + defaultUnitLogLevel = client.UnitLogLevelInfo + headersKey = "headers" + elasticsearchType = "elasticsearch" + ProcessRuntimeManager = RuntimeManager("process") + OtelRuntimeManager = RuntimeManager("otel") + DefaultRuntimeManager RuntimeManager = ProcessRuntimeManager ) // ErrInputRuntimeCheckFail error is used when an input specification runtime prevention check occurs. @@ -157,6 +164,8 @@ type Component struct { // The logical output type, i.e. the type of output that was requested. OutputType string `yaml:"output_type"` + RuntimeManager RuntimeManager `yaml:"-"` + // Units that should be running inside this component. Units []Unit `yaml:"units"` @@ -360,27 +369,37 @@ func (r *RuntimeSpecs) componentsForInputType( componentErr = ErrOutputNotSupported } - var units []Unit + unitsForRuntimeManager := make(map[RuntimeManager][]Unit) for _, input := range output.inputs[inputType] { if input.enabled { unitID := fmt.Sprintf("%s-%s", componentID, input.id) - units = append(units, unitForInput(input, unitID)) + unitsForRuntimeManager[input.runtimeManager] = append( + unitsForRuntimeManager[input.runtimeManager], + unitForInput(input, unitID), + ) } } - if len(units) > 0 { - // Populate the output units for this component - units = append(units, unitForOutput(output, componentID)) - components = append(components, Component{ - ID: componentID, - Err: componentErr, - InputSpec: &inputSpec, - InputType: inputType, - OutputType: output.outputType, - Units: units, - Features: featureFlags.AsProto(), - Component: componentConfig.AsProto(), - }) + // sort to ensure consistent order + runtimeManagers := slices.Collect(maps.Keys(unitsForRuntimeManager)) + slices.Sort(runtimeManagers) + for _, runtimeManager := range runtimeManagers { + units := unitsForRuntimeManager[runtimeManager] + if len(units) > 0 { + // Populate the output units for this component + units = append(units, unitForOutput(output, componentID)) + components = append(components, Component{ + ID: componentID, + Err: componentErr, + InputSpec: &inputSpec, + InputType: inputType, + OutputType: output.outputType, + Units: units, + RuntimeManager: runtimeManager, + Features: featureFlags.AsProto(), + Component: componentConfig.AsProto(), + }) + } } } else { for _, input := range output.inputs[inputType] { @@ -399,14 +418,15 @@ func (r *RuntimeSpecs) componentsForInputType( // each component gets its own output, because of unit isolation units = append(units, unitForOutput(output, componentID)) components = append(components, Component{ - ID: componentID, - Err: componentErr, - InputSpec: &inputSpec, - InputType: inputType, - OutputType: output.outputType, - Units: units, - Features: featureFlags.AsProto(), - Component: componentConfig.AsProto(), + ID: componentID, + Err: componentErr, + InputSpec: &inputSpec, + InputType: inputType, + OutputType: output.outputType, + Units: units, + RuntimeManager: input.runtimeManager, + Features: featureFlags.AsProto(), + Component: componentConfig.AsProto(), }) } } @@ -514,12 +534,13 @@ func injectInputPolicyID(fleetPolicy map[string]interface{}, inputConfig map[str // of components. func toIntermediate(policy map[string]interface{}, aliasMapping map[string]string, ll logp.Level, headers HeadersProvider) (map[string]outputI, error) { const ( - outputsKey = "outputs" - enabledKey = "enabled" - inputsKey = "inputs" - typeKey = "type" - idKey = "id" - useOutputKey = "use_output" + outputsKey = "outputs" + enabledKey = "enabled" + inputsKey = "inputs" + typeKey = "type" + idKey = "id" + useOutputKey = "use_output" + runtimeManagerKey = "_runtime_experimental" ) // intermediate structure for output to input mapping (this structure allows different input types per output) @@ -660,17 +681,35 @@ func toIntermediate(policy map[string]interface{}, aliasMapping map[string]strin return nil, fmt.Errorf("invalid 'inputs.%d.log_level', %w", idx, err) } + runtimeManager := DefaultRuntimeManager + // determine the runtime manager for the input + if runtimeManagerRaw, ok := input[runtimeManagerKey]; ok { + runtimeManagerStr, ok := runtimeManagerRaw.(string) + if !ok { + return nil, fmt.Errorf("invalid 'inputs.%d.runtime', expected a string, not a %T", idx, runtimeManagerRaw) + } + runtimeManagerVal := RuntimeManager(runtimeManagerStr) + switch runtimeManagerVal { + case OtelRuntimeManager, ProcessRuntimeManager: + default: + return nil, fmt.Errorf("invalid 'inputs.%d.runtime', valid values are: %s, %s", idx, OtelRuntimeManager, ProcessRuntimeManager) + } + runtimeManager = runtimeManagerVal + delete(input, runtimeManagerKey) + } + // Inject the top level fleet policy revision into each input configuration. This // allows individual inputs (like endpoint) to detect policy changes more easily. injectInputPolicyID(policy, input) output.inputs[t] = append(output.inputs[t], inputI{ - idx: idx, - id: id, - enabled: enabled, - logLevel: logLevel, - inputType: t, - config: input, + idx: idx, + id: id, + enabled: enabled, + logLevel: logLevel, + inputType: t, + config: input, + runtimeManager: runtimeManager, }) } if len(outputsMap) == 0 { @@ -680,11 +719,12 @@ func toIntermediate(policy map[string]interface{}, aliasMapping map[string]strin } type inputI struct { - idx int - id string - enabled bool - logLevel client.UnitLogLevel - inputType string // canonical (non-alias) type + idx int + id string + enabled bool + logLevel client.UnitLogLevel + inputType string // canonical (non-alias) type + runtimeManager RuntimeManager // The raw configuration for this input, with small cleanups: // - the "enabled", "use_output", and "log_level" keys are removed diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 23c4685d36e..1e84cf97ac4 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -237,6 +237,44 @@ func TestToComponents(t *testing.T) { }, Err: "invalid 'inputs.0.type', expected a string not a int", }, + { + Name: "Invalid: input runtime manager type not a string", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "filestream", + "_runtime_experimental": 0, + }, + }, + }, + Err: "invalid 'inputs.0.runtime', expected a string, not a int", + }, + { + Name: "Invalid: input runtime manager value", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "filestream", + "_runtime_experimental": "invalid", + }, + }, + }, + Err: "invalid 'inputs.0.runtime', valid values are: otel, process", + }, { Name: "Invalid: inputs entry duplicate because of missing id", Platform: linuxAMD64Platform, @@ -484,6 +522,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -533,6 +572,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -594,6 +634,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -662,6 +703,7 @@ func TestToComponents(t *testing.T) { Err: fmt.Errorf("decoding error: %w", fmt.Errorf("decoding failed due to the following error(s):\n\n%w", errors.Join(errors.New("'meta' expected a map, got 'slice'")))), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -724,6 +766,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "cloudbeat", @@ -750,6 +793,7 @@ func TestToComponents(t *testing.T) { Err: fmt.Errorf("decoding error: %w", fmt.Errorf("decoding failed due to the following error(s):\n\n%w", errors.Join(errors.New("'meta' expected a map, got 'slice'")))), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -885,6 +929,104 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, + }, + }, + }, + { + Name: "Different runtime managers", + Platform: linuxAMD64Platform, + Policy: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-1", + "_runtime_experimental": "process", + }, + map[string]interface{}{ + "type": "filestream", + "id": "filestream-2", + "_runtime_experimental": "otel", + }, + }, + }, + Result: []Component{ + { + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "testbeat", + BinaryPath: filepath.Join("..", "..", "specs", "testbeat"), + }, + Units: []Unit{ + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "filestream-default-filestream-2", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-2", + }), + }, + }, + RuntimeManager: OtelRuntimeManager, + }, + { + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "testbeat", + BinaryPath: filepath.Join("..", "..", "specs", "testbeat"), + }, + Units: []Unit{ + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "elasticsearch", + }), + }, + { + ID: "filestream-default-filestream-0", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-0", + }), + }, + { + ID: "filestream-default-filestream-1", + Type: client.UnitTypeInput, + LogLevel: defaultUnitLogLevel, + Config: MustExpectedConfig(map[string]interface{}{ + "type": "filestream", + "id": "filestream-1", + }), + }, + }, + RuntimeManager: ProcessRuntimeManager, }, }, }, @@ -939,6 +1081,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -994,6 +1137,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -1049,6 +1193,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -1105,6 +1250,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -1161,6 +1307,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -1274,6 +1421,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "filestream", @@ -1311,6 +1459,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "log", @@ -1347,6 +1496,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "log", @@ -1375,6 +1525,7 @@ func TestToComponents(t *testing.T) { }, "log"), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "log", @@ -1403,6 +1554,7 @@ func TestToComponents(t *testing.T) { }, "log"), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "log", @@ -1431,6 +1583,7 @@ func TestToComponents(t *testing.T) { }, "log"), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "apm", @@ -1459,6 +1612,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -1563,6 +1717,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "cloudbeat", @@ -1591,6 +1746,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "cloudbeat", @@ -1619,6 +1775,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "cloudbeat", @@ -1647,6 +1804,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "cloudbeat", @@ -1675,6 +1833,7 @@ func TestToComponents(t *testing.T) { }, "cloudbeat"), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "cloudbeat", @@ -1703,6 +1862,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "cloudbeat", @@ -1731,6 +1891,7 @@ func TestToComponents(t *testing.T) { }, "cloudbeat"), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "cloudbeat", @@ -1759,6 +1920,7 @@ func TestToComponents(t *testing.T) { }, "cloudbeat"), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "cloudbeat", @@ -1787,6 +1949,7 @@ func TestToComponents(t *testing.T) { }, "cloudbeat"), }, }, + RuntimeManager: DefaultRuntimeManager, }, { InputType: "apm", @@ -1815,6 +1978,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -1878,6 +2042,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, }, @@ -1930,6 +2095,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, headers: &testHeadersProvider{headers: map[string]string{ @@ -1985,6 +2151,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, headers: &testHeadersProvider{headers: map[string]string{ @@ -2044,6 +2211,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, headers: &testHeadersProvider{headers: map[string]string{ @@ -2103,6 +2271,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, headers: &testHeadersProvider{headers: map[string]string{ @@ -2155,6 +2324,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, headers: &testHeadersProvider{headers: map[string]string{ @@ -2207,6 +2377,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, headers: &testHeadersProvider{headers: map[string]string{ @@ -2259,6 +2430,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, headers: &testHeadersProvider{headers: map[string]string{ @@ -2311,6 +2483,7 @@ func TestToComponents(t *testing.T) { }), }, }, + RuntimeManager: DefaultRuntimeManager, }, }, headers: &testHeadersProvider{headers: map[string]string{ diff --git a/testing/integration/diagnostics_test.go b/testing/integration/diagnostics_test.go index d43dd3f8aa7..fb62e218586 100644 --- a/testing/integration/diagnostics_test.go +++ b/testing/integration/diagnostics_test.go @@ -45,6 +45,7 @@ var diagnosticsFiles = []string{ "local-config.yaml", "mutex.pprof.gz", "otel.yaml", + "otel-final.yaml", "pre-config.yaml", "local-config.yaml", "state.yaml", diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 1fd854aeeba..292c22124b2 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -979,6 +979,141 @@ service: require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error()) } +func TestOtelFilestreamInput(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: Default, + Local: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + tmpDir := t.TempDir() + numEvents := 50 + // Create the data file to ingest + inputFile, err := os.CreateTemp(tmpDir, "input.txt") + require.NoError(t, err, "failed to create temp file to hold data to ingest") + inputFilePath := inputFile.Name() + for i := 0; i < numEvents; i++ { + _, err = inputFile.Write([]byte(fmt.Sprintf("Line %d\n", i))) + require.NoErrorf(t, err, "failed to write line %d to temp file", i) + } + err = inputFile.Close() + require.NoError(t, err, "failed to close data temp file") + t.Cleanup(func() { + if t.Failed() { + contents, err := os.ReadFile(inputFilePath) + if err != nil { + t.Logf("no data file to import at %s", inputFilePath) + return + } + t.Logf("contents of import file:\n%s\n", string(contents)) + } + }) + + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + // Create the otel configuration file + type otelConfigOptions struct { + InputPath string + ESEndpoint string + ESApiKey string + } + esEndpoint, err := getESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + configTemplate := `inputs: + - type: filestream + id: filestream-e2e + use_output: default + _runtime_experimental: otel + streams: + - id: e2e + data_stream: + dataset: e2e + paths: + - {{.InputPath}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ +outputs: + default: + type: elasticsearch + hosts: [{{.ESEndpoint}}] + api_key: "{{.ESApiKey}}" + preset: "balanced" +agent: + monitoring: + metrics: false + logs: false +` + index := ".ds-logs-e2e-*" + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + otelConfigOptions{ + InputPath: inputFilePath, + ESEndpoint: esEndpoint, + ESApiKey: esApiKey.Encoded, + })) + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx) + require.NoError(t, err) + + err = fixture.Configure(ctx, configBuffer.Bytes()) + + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + require.NoError(t, err, "cannot prepare Elastic-Agent command: %w", err) + + output := strings.Builder{} + cmd.Stderr = &output + cmd.Stdout = &output + + err = cmd.Start() + require.NoError(t, err) + + // Make sure the Elastic-Agent process is not running before + // exiting the test + t.Cleanup(func() { + // Ignore the error because we cancelled the context, + // and that always returns an error + _ = cmd.Wait() + if t.Failed() { + t.Log("Elastic-Agent output:") + t.Log(output.String()) + } + }) + + // Make sure find the logs + actualHits := &struct{ Hits int }{} + assert.EventuallyWithT(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, index, map[string]interface{}{ + "log.file.path": inputFilePath, + }) + require.NoError(ct, err) + + actualHits.Hits = docs.Hits.Total.Value + output, execErr := fixture.ExecStatus(context.Background()) + require.NoError(ct, execErr) + t.Logf("status output: %v", output) + assert.Equal(ct, numEvents, actualHits.Hits) + }, + 2*time.Minute, 5*time.Second, + "Expected %d logs, got %v", numEvents, actualHits) + + cancel() +} + func TestOtelMBReceiverE2E(t *testing.T) { info := define.Require(t, define.Requirements{ Group: Default, @@ -1180,7 +1315,7 @@ outputs: type: elasticsearch hosts: [{{.ESEndpoint}}] api_key: {{.BeatsESApiKey}} - compression_level: 0 + compression_level: 0 receivers: filebeatreceiver: filebeat: