diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 29c7fff51f9..42c0cbd8e29 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -240,7 +240,7 @@ func New( return nil, nil, nil, errors.New(err, "failed to initialize composable controller") } - otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig) + otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout) if err != nil { return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err) } diff --git a/internal/pkg/otel/manager/execution.go b/internal/pkg/otel/manager/execution.go index d55ff6c09f4..3814ab4137d 100644 --- a/internal/pkg/otel/manager/execution.go +++ b/internal/pkg/otel/manager/execution.go @@ -6,6 +6,7 @@ package manager import ( "context" + "time" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "go.opentelemetry.io/collector/confmap" @@ -18,5 +19,5 @@ type collectorExecution interface { } type collectorHandle interface { - Stop(ctx context.Context) + Stop(waitTime time.Duration) } diff --git a/internal/pkg/otel/manager/execution_embedded.go b/internal/pkg/otel/manager/execution_embedded.go index 696dde81428..9f67bc20e07 100644 --- a/internal/pkg/otel/manager/execution_embedded.go +++ b/internal/pkg/otel/manager/execution_embedded.go @@ -6,6 +6,7 @@ package manager import ( "context" + "time" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "go.opentelemetry.io/collector/confmap" @@ -66,7 +67,7 @@ type ctxHandle struct { } // Stop stops the collector -func (s *ctxHandle) Stop(ctx context.Context) { +func (s *ctxHandle) Stop(waitTime time.Duration) { if s.cancel == nil { return } @@ -74,7 +75,8 @@ func (s *ctxHandle) Stop(ctx context.Context) { s.cancel() select { - case <-ctx.Done(): + case <-time.After(waitTime): + return case <-s.collectorDoneCh: } } diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index a6ffe56159e..ba2394af004 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -30,8 +30,6 @@ import ( ) const ( - processKillAfter = 5 * time.Second - OtelSetSupervisedFlagName = "supervised" OtelSupervisedLoggingLevelFlagName = "supervised.logging.level" ) @@ -56,6 +54,7 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string) (*subproc }, logLevel: logLevel, healthCheckExtensionID: healthCheckExtensionID, + reportErrFn: reportErr, }, nil } @@ -64,6 +63,7 @@ type subprocessExecution struct { collectorArgs []string logLevel logp.Level healthCheckExtensionID string + reportErrFn func(ctx context.Context, errCh chan error, err error) // required for testing } // startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the @@ -106,7 +106,6 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger procCtx, procCtxCancel := context.WithCancel(ctx) processInfo, err := process.Start(r.collectorPath, process.WithArgs(r.collectorArgs), - process.WithContext(procCtx), process.WithEnv(os.Environ()), process.WithCmdOptions(func(c *exec.Cmd) error { c.Stdin = bytes.NewReader(confBytes) @@ -130,6 +129,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger ctl := &procHandle{ processDoneCh: make(chan struct{}), processInfo: processInfo, + log: logger, } healthCheckDone := make(chan struct{}) @@ -196,14 +196,14 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger if procErr == nil { if procState.Success() { // report nil error so that the caller can be notified that the process has exited without error - reportErr(ctx, processErrCh, nil) + r.reportErrFn(ctx, processErrCh, nil) } else { - reportErr(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String())) + r.reportErrFn(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String())) } return } - reportErr(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr)) + r.reportErrFn(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr)) }() return ctl, nil @@ -212,11 +212,12 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger type procHandle struct { processDoneCh chan struct{} processInfo *process.Info + log *logger.Logger } // Stop stops the process. If the process is already stopped, it does nothing. If the process does not stop within // processKillAfter or due to an error, it will be killed. -func (s *procHandle) Stop(ctx context.Context) { +func (s *procHandle) Stop(waitTime time.Duration) { select { case <-s.processDoneCh: // process has already exited @@ -225,19 +226,18 @@ func (s *procHandle) Stop(ctx context.Context) { } if err := s.processInfo.Stop(); err != nil { + s.log.Warnf("failed to send stop signal to the supervised collector: %v", err) // we failed to stop the process just kill it and return _ = s.processInfo.Kill() return } select { - case <-ctx.Done(): + case <-time.After(waitTime): + s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String()) // our caller ctx is Done; kill the process just in case _ = s.processInfo.Kill() case <-s.processDoneCh: // process has already exited - case <-time.After(processKillAfter): - // process is still running kill it - _ = s.processInfo.Kill() } } diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 14fcfcd0ead..84a553e2f43 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -101,6 +101,12 @@ type OTelManager struct { execution collectorExecution proc collectorHandle + + // collectorRunErr is used to signal that the collector has exited. + collectorRunErr chan error + + // stopTimeout is the timeout to wait for the collector to stop. + stopTimeout time.Duration } // NewOTelManager returns a OTelManager. @@ -111,6 +117,7 @@ func NewOTelManager( mode ExecutionMode, agentInfo info.Agent, beatMonitoringConfigGetter translate.BeatMonitoringConfigGetter, + stopTimeout time.Duration, ) (*OTelManager, error) { var exec collectorExecution var recoveryTimer collectorRecoveryTimer @@ -131,7 +138,7 @@ func NewOTelManager( recoveryTimer = newRestarterNoop() exec = newExecutionEmbedded() default: - return nil, errors.New("unknown otel collector exec") + return nil, fmt.Errorf("unknown otel collector execution mode: %q", mode) } logger.Debugf("Using collector execution mode: %s", mode) @@ -144,10 +151,12 @@ func NewOTelManager( errCh: make(chan error, 1), // holds at most one error collectorStatusCh: make(chan *status.AggregateStatus, 1), componentStateCh: make(chan []runtime.ComponentComponentState, 1), - updateCh: make(chan configUpdate), + updateCh: make(chan configUpdate, 1), doneChan: make(chan struct{}), execution: exec, recoveryTimer: recoveryTimer, + collectorRunErr: make(chan error), + stopTimeout: stopTimeout, }, nil } @@ -156,12 +165,6 @@ func (m *OTelManager) Run(ctx context.Context) error { var err error m.proc = nil - // signal that the run loop is ended to unblock any incoming update calls - defer close(m.doneChan) - - // collectorRunErr is used to signal that the collector has exited. - collectorRunErr := make(chan error) - // collectorStatusCh is used internally by the otel collector to send status updates to the manager // this channel is buffered because it's possible for the collector to send a status update while the manager is // waiting for the collector to exit @@ -169,11 +172,14 @@ func (m *OTelManager) Run(ctx context.Context) error { for { select { case <-ctx.Done(): + // signal that the run loop is ended to unblock any incoming update calls + close(m.doneChan) + m.recoveryTimer.Stop() // our caller context is cancelled so stop the collector and return // has exited. if m.proc != nil { - m.proc.Stop(ctx) + m.proc.Stop(m.stopTimeout) } return ctx.Err() case <-m.recoveryTimer.C(): @@ -187,7 +193,7 @@ func (m *OTelManager) Run(ctx context.Context) error { newRetries := m.recoveryRetries.Add(1) m.logger.Infof("collector recovery restarting, total retries: %d", newRetries) - m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh) + m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh) if err != nil { reportErr(ctx, m.errCh, err) // reset the restart timer to the next backoff @@ -197,12 +203,12 @@ func (m *OTelManager) Run(ctx context.Context) error { reportErr(ctx, m.errCh, nil) } - case err = <-collectorRunErr: + case err = <-m.collectorRunErr: m.recoveryTimer.Stop() if err == nil { // err is nil means that the collector has exited cleanly without an error if m.proc != nil { - m.proc.Stop(ctx) + m.proc.Stop(m.stopTimeout) m.proc = nil updateErr := m.reportOtelStatusUpdate(ctx, nil) if updateErr != nil { @@ -223,7 +229,7 @@ func (m *OTelManager) Run(ctx context.Context) error { // in this rare case the collector stopped running but a configuration was // provided and the collector stopped with a clean exit - m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh) + m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh) if err != nil { // failed to create the collector (this is different then // it's failing to run). we do not retry creation on failure @@ -245,7 +251,7 @@ func (m *OTelManager) Run(ctx context.Context) error { // in the case that the configuration is invalid there is no reason to // try again as it will keep failing so we do not trigger a restart if m.proc != nil { - m.proc.Stop(ctx) + m.proc.Stop(m.stopTimeout) m.proc = nil // don't wait here for <-collectorRunErr, already occurred // clear status, no longer running @@ -281,7 +287,7 @@ func (m *OTelManager) Run(ctx context.Context) error { m.components = cfgUpdate.components m.mx.Unlock() - err = m.applyMergedConfig(ctx, collectorStatusCh, collectorRunErr) + err = m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr) // report the error unconditionally to indicate that the config was applied reportErr(ctx, m.errCh, err) @@ -340,7 +346,7 @@ func buildMergedConfig(cfgUpdate configUpdate, agentInfo info.Agent, monitoringC func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error) error { if m.proc != nil { - m.proc.Stop(ctx) + m.proc.Stop(m.stopTimeout) m.proc = nil select { case <-collectorRunErr: @@ -402,6 +408,15 @@ func (m *OTelManager) Update(cfg *confmap.Conf, components []component.Component collectorCfg: cfg, components: components, } + + // we care only about the latest config update + select { + case <-m.updateCh: + case <-m.doneChan: + return + default: + } + select { case m.updateCh <- cfgUpdate: case <-m.doneChan: diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index bb69f493fa5..e8ea07e626d 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -138,10 +138,11 @@ type mockCollectorHandle struct { cancel context.CancelFunc } -func (h *mockCollectorHandle) Stop(ctx context.Context) { +func (h *mockCollectorHandle) Stop(waitTime time.Duration) { h.cancel() select { - case <-ctx.Done(): + case <-time.After(waitTime): + return case <-h.stopCh: } } @@ -288,20 +289,22 @@ func TestOTelManager_Run(t *testing.T) { testBinary := filepath.Join(wd, "testing", "testing") require.FileExists(t, testBinary, "testing binary not found") + const waitTimeForStop = 30 * time.Second + for _, tc := range []struct { name string - exec func() (collectorExecution, error) + execModeFn func(collectorRunErr chan error) (collectorExecution, error) restarter collectorRecoveryTimer skipListeningErrors bool - testFn func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) + testFn func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) }{ { name: "embedded collector config updates", - exec: func() (collectorExecution, error) { + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { return newExecutionEmbedded(), nil }, restarter: newRestarterNoop(), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() @@ -322,11 +325,11 @@ func TestOTelManager_Run(t *testing.T) { }, { name: "subprocess collector config updates", - exec: func() (collectorExecution, error) { + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { return newSubprocessExecution(logp.DebugLevel, testBinary) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() @@ -348,11 +351,11 @@ func TestOTelManager_Run(t *testing.T) { }, { name: "embedded collector stopped gracefully outside manager", - exec: func() (collectorExecution, error) { + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { return newExecutionEmbedded(), nil }, restarter: newRestarterNoop(), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() @@ -361,8 +364,8 @@ func TestOTelManager_Run(t *testing.T) { // stop it, this should be restarted by the manager updateTime = time.Now() - require.NotNil(t, exec.handle, "exec handle should not be nil") - exec.handle.Stop(t.Context()) + require.NotNil(t, exec.handle, "execModeFn handle should not be nil") + exec.handle.Stop(waitTimeForStop) e.EnsureHealthy(t, updateTime) // no configuration should stop the runner @@ -374,11 +377,11 @@ func TestOTelManager_Run(t *testing.T) { }, { name: "subprocess collector stopped gracefully outside manager", - exec: func() (collectorExecution, error) { + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { return newSubprocessExecution(logp.DebugLevel, testBinary) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() @@ -387,8 +390,8 @@ func TestOTelManager_Run(t *testing.T) { // stop it, this should be restarted by the manager updateTime = time.Now() - require.NotNil(t, exec.handle, "exec handle should not be nil") - exec.handle.Stop(t.Context()) + require.NotNil(t, exec.handle, "execModeFn handle should not be nil") + exec.handle.Stop(waitTimeForStop) e.EnsureHealthy(t, updateTime) assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1") @@ -402,11 +405,11 @@ func TestOTelManager_Run(t *testing.T) { }, { name: "subprocess collector killed outside manager", - exec: func() (collectorExecution, error) { + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { return newSubprocessExecution(logp.DebugLevel, testBinary) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() @@ -419,9 +422,9 @@ func TestOTelManager_Run(t *testing.T) { for i := 0; i < 3; i++ { // kill it handle := exec.getProcessHandle() - require.NotNil(t, handle, "exec handle should not be nil, iteration ", i) + require.NotNil(t, handle, "execModeFn handle should not be nil, iteration ", i) pHandle, ok := handle.(*procHandle) - require.True(t, ok, "exec handle should be of type procHandle, iteration ", i) + require.True(t, ok, "execModeFn handle should be of type procHandle, iteration ", i) if oldPHandle != nil { require.NotEqual(t, pHandle.processInfo.PID, oldPHandle.processInfo.PID, "processes PIDs should be different, iteration ", i) } @@ -445,11 +448,11 @@ func TestOTelManager_Run(t *testing.T) { }, { name: "subprocess collector panics", - exec: func() (collectorExecution, error) { + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { return newSubprocessExecution(logp.DebugLevel, testBinary) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { err := os.Setenv("TEST_SUPERVISED_COLLECTOR_PANIC", (3 * time.Second).String()) require.NoError(t, err, "failed to set TEST_SUPERVISED_COLLECTOR_PANIC env var") t.Cleanup(func() { @@ -479,14 +482,120 @@ func TestOTelManager_Run(t *testing.T) { assert.GreaterOrEqual(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") }, }, + { + name: "subprocess collector killed if delayed and manager is stopped", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary) + if err != nil { + return nil, err + } + subprocessExec.reportErrFn = func(ctx context.Context, errCh chan error, err error) { + // override the reportErrFn to send the error to this test collectorRunErr channel + // so we can listen to subprocess run errors + if errCh != collectorRunErr { + // if the error channel is not the one we expect, forward the error to the original reportErrFn + reportErr(ctx, errCh, err) + return + } + collectorRunErr <- err + } + return &testExecution{exec: subprocessExec}, nil + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + delayDuration := 40 * time.Second // the otel manager stop timeout is waitTimeForStop (30 seconds) + t.Setenv("TEST_SUPERVISED_COLLECTOR_DELAY", delayDuration.String()) + + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg, nil) + e.EnsureHealthy(t, updateTime) + + // stop the manager to simulate that elastic-agent is shutting down + managerCtxCancel() + + // wait for the manager to report done + select { + case <-m.doneChan: + case <-time.After(10 * time.Second): + require.Fail(t, "manager should have reported done") + case <-t.Context().Done(): + return + } + + // wait for the subprocess to exit by checking the collectorRunErr channel + select { + case err := <-collectorRunErr: + require.Error(t, err, "process should have exited with an error") + case <-t.Context().Done(): + return + case <-time.After(2 * waitTimeForStop): + require.Fail(t, "timeout waiting for process to exit") + } + }, + }, + { + name: "subprocess collector gracefully exited if delayed a bit and manager is stopped", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary) + if err != nil { + return nil, err + } + subprocessExec.reportErrFn = func(ctx context.Context, errCh chan error, err error) { + // override the reportErrFn to send the error to this test collectorRunErr channel + // so we can listen to subprocess run errors + if errCh != collectorRunErr { + // if the error channel is not the one we expect, forward the error to the original reportErrFn + reportErr(ctx, errCh, err) + return + } + collectorRunErr <- err + } + return &testExecution{exec: subprocessExec}, nil + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + delayDuration := 5 * time.Second // the otel manager stop timeout is waitTimeForStop (30 seconds) + t.Setenv("TEST_SUPERVISED_COLLECTOR_DELAY", delayDuration.String()) + + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg, nil) + e.EnsureHealthy(t, updateTime) + + // stop the manager to simulate that elastic-agent is shutting down + managerCtxCancel() + + // wait for the manager to report done + select { + case <-m.doneChan: + case <-time.After(10 * time.Second): + require.Fail(t, "manager should have reported done") + case <-t.Context().Done(): + return + } + + // wait for the subprocess to exit by checking the collectorRunErr channel + select { + case err := <-collectorRunErr: + require.NoError(t, err, "process should have exited without an error") + case <-t.Context().Done(): + return + case <-time.After(2 * waitTimeForStop): + require.Fail(t, "timeout waiting for process to exit") + } + }, + }, { name: "embedded collector invalid config", - exec: func() (collectorExecution, error) { + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { return newExecutionEmbedded(), nil }, restarter: newRestarterNoop(), skipListeningErrors: true, - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { // Errors channel is non-blocking, should be able to send an Update that causes an error multiple // times without it blocking on sending over the errCh. for range 3 { @@ -522,12 +631,12 @@ func TestOTelManager_Run(t *testing.T) { }, { name: "subprocess collector invalid config", - exec: func() (collectorExecution, error) { + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { return newSubprocessExecution(logp.DebugLevel, testBinary) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), skipListeningErrors: true, - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { // Errors channel is non-blocking, should be able to send an Update that causes an error multiple // times without it blocking on sending over the errCh. for range 3 { @@ -568,22 +677,24 @@ func TestOTelManager_Run(t *testing.T) { l, _ := loggertest.New("otel") base, obs := loggertest.New("otel") - executionMode, err := tc.exec() - require.NoError(t, err, "failed to create execution mode") - testExecutionMode := &testExecution{exec: executionMode} - m := &OTelManager{ logger: l, baseLogger: base, errCh: make(chan error, 1), // holds at most one error - updateCh: make(chan configUpdate), + updateCh: make(chan configUpdate, 1), collectorStatusCh: make(chan *status.AggregateStatus), componentStateCh: make(chan []runtime.ComponentComponentState, 1), doneChan: make(chan struct{}), + collectorRunErr: make(chan error), recoveryTimer: tc.restarter, - execution: testExecutionMode, + stopTimeout: waitTimeForStop, } + executionMode, err := tc.execModeFn(m.collectorRunErr) + require.NoError(t, err, "failed to create execution mode") + testExecutionMode := &testExecution{exec: executionMode} + m.execution = testExecutionMode + eListener := &EventListener{} defer func() { if !t.Failed() { @@ -609,12 +720,13 @@ func TestOTelManager_Run(t *testing.T) { var runErr error runWg.Add(1) + managerCtx, managerCancel := context.WithCancel(ctx) go func() { defer runWg.Done() - runErr = m.Run(ctx) + runErr = m.Run(managerCtx) }() - tc.testFn(t, m, eListener, testExecutionMode) + tc.testFn(t, m, eListener, testExecutionMode, managerCancel, m.collectorRunErr) cancel() runWg.Wait() @@ -628,9 +740,11 @@ func TestOTelManager_Run(t *testing.T) { func TestOTelManager_Logging(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + const waitTimeForStop = 30 * time.Second + base, obs := loggertest.New("otel") l, _ := loggertest.New("otel-manager") - m, err := NewOTelManager(l, logp.DebugLevel, base, EmbeddedExecutionMode, nil, nil) + m, err := NewOTelManager(l, logp.DebugLevel, base, EmbeddedExecutionMode, nil, nil, waitTimeForStop) require.NoError(t, err, "could not create otel manager") go func() { @@ -1042,7 +1156,7 @@ func TestOTelManagerEndToEnd(t *testing.T) { logger: testLogger, baseLogger: testLogger, errCh: make(chan error, 1), // holds at most one error - updateCh: make(chan configUpdate), + updateCh: make(chan configUpdate, 1), collectorStatusCh: make(chan *status.AggregateStatus, 1), componentStateCh: make(chan []runtime.ComponentComponentState, 1), doneChan: make(chan struct{}), @@ -1050,6 +1164,7 @@ func TestOTelManagerEndToEnd(t *testing.T) { execution: execution, agentInfo: agentInfo, beatMonitoringConfigGetter: beatMonitoringConfigGetter, + collectorRunErr: make(chan error), } // Start manager in a goroutine diff --git a/internal/pkg/otel/manager/testing/testing.go b/internal/pkg/otel/manager/testing/testing.go index 6ad11fd8644..e7afd25650d 100644 --- a/internal/pkg/otel/manager/testing/testing.go +++ b/internal/pkg/otel/manager/testing/testing.go @@ -13,7 +13,24 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/cmd" ) +// This is a test binary used by the OTEL manager unit tests. +// It launches a supervised collector using cmd.RunCollector, and can be +// configured via env vars to simulate different scenarios: +// - TEST_SUPERVISED_COLLECTOR_PANIC: triggers a panic after the given delay, +// allowing tests to verify the manager’s panic/restart behavior. +// - TEST_SUPERVISED_COLLECTOR_DELAY: delays process shutdown by the given +// duration, letting tests observe graceful termination handling. +// +// The binary exits with code 0 on a successful collector run (or when canceled), +// and code 1 if the collector returns an error. func main() { + var shutdownDelay time.Duration + var err error + shutdownDelayEnvVar := os.Getenv("TEST_SUPERVISED_COLLECTOR_DELAY") + if shutdownDelayEnvVar != "" { + shutdownDelay, _ = time.ParseDuration(shutdownDelayEnvVar) + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -28,9 +45,15 @@ func main() { }) } - err := cmd.RunCollector(ctx, nil, true, "debug") - if err == nil || errors.Is(err, context.Canceled) { - os.Exit(0) + exitCode := 0 + err = cmd.RunCollector(ctx, nil, true, "debug") + if err != nil && !errors.Is(err, context.Canceled) { + exitCode = 1 } - os.Exit(1) + + if shutdownDelay > 0 { + <-time.After(shutdownDelay) + } + + os.Exit(exitCode) }