Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func New(
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}

otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig)
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/otel/manager/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package manager

import (
"context"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"go.opentelemetry.io/collector/confmap"
Expand All @@ -18,5 +19,5 @@ type collectorExecution interface {
}

type collectorHandle interface {
Stop(ctx context.Context)
Stop(waitTime time.Duration)
}
6 changes: 4 additions & 2 deletions internal/pkg/otel/manager/execution_embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package manager

import (
"context"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"go.opentelemetry.io/collector/confmap"
Expand Down Expand Up @@ -66,15 +67,16 @@ type ctxHandle struct {
}

// Stop stops the collector
func (s *ctxHandle) Stop(ctx context.Context) {
func (s *ctxHandle) Stop(waitTime time.Duration) {
if s.cancel == nil {
return
}

s.cancel()

select {
case <-ctx.Done():
case <-time.After(waitTime):
return
case <-s.collectorDoneCh:
}
}
22 changes: 11 additions & 11 deletions internal/pkg/otel/manager/execution_subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
)

const (
processKillAfter = 5 * time.Second

OtelSetSupervisedFlagName = "supervised"
OtelSupervisedLoggingLevelFlagName = "supervised.logging.level"
)
Expand All @@ -56,6 +54,7 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string) (*subproc
},
logLevel: logLevel,
healthCheckExtensionID: healthCheckExtensionID,
reportErrFn: reportErr,
}, nil
}

Expand All @@ -64,6 +63,7 @@ type subprocessExecution struct {
collectorArgs []string
logLevel logp.Level
healthCheckExtensionID string
reportErrFn func(ctx context.Context, errCh chan error, err error) // required for testing
}

// startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the
Expand Down Expand Up @@ -106,7 +106,6 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
procCtx, procCtxCancel := context.WithCancel(ctx)
processInfo, err := process.Start(r.collectorPath,
process.WithArgs(r.collectorArgs),
process.WithContext(procCtx),
process.WithEnv(os.Environ()),
process.WithCmdOptions(func(c *exec.Cmd) error {
c.Stdin = bytes.NewReader(confBytes)
Expand All @@ -130,6 +129,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
ctl := &procHandle{
processDoneCh: make(chan struct{}),
processInfo: processInfo,
log: logger,
}

healthCheckDone := make(chan struct{})
Expand Down Expand Up @@ -196,14 +196,14 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
if procErr == nil {
if procState.Success() {
// report nil error so that the caller can be notified that the process has exited without error
reportErr(ctx, processErrCh, nil)
r.reportErrFn(ctx, processErrCh, nil)
} else {
reportErr(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String()))
r.reportErrFn(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String()))
}
return
}

reportErr(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr))
r.reportErrFn(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr))
}()

return ctl, nil
Expand All @@ -212,11 +212,12 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
type procHandle struct {
processDoneCh chan struct{}
processInfo *process.Info
log *logger.Logger
}

// Stop stops the process. If the process is already stopped, it does nothing. If the process does not stop within
// processKillAfter or due to an error, it will be killed.
func (s *procHandle) Stop(ctx context.Context) {
func (s *procHandle) Stop(waitTime time.Duration) {
select {
case <-s.processDoneCh:
// process has already exited
Expand All @@ -225,19 +226,18 @@ func (s *procHandle) Stop(ctx context.Context) {
}

if err := s.processInfo.Stop(); err != nil {
s.log.Warnf("failed to send stop signal to the supervised collector: %v", err)
// we failed to stop the process just kill it and return
_ = s.processInfo.Kill()
return
}

select {
case <-ctx.Done():
case <-time.After(waitTime):
s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String())
// our caller ctx is Done; kill the process just in case
_ = s.processInfo.Kill()
case <-s.processDoneCh:
// process has already exited
case <-time.After(processKillAfter):
// process is still running kill it
_ = s.processInfo.Kill()
}
}
47 changes: 31 additions & 16 deletions internal/pkg/otel/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type OTelManager struct {
execution collectorExecution

proc collectorHandle

// collectorRunErr is used to signal that the collector has exited.
collectorRunErr chan error

// stopTimeout is the timeout to wait for the collector to stop.
stopTimeout time.Duration
}

// NewOTelManager returns a OTelManager.
Expand All @@ -111,6 +117,7 @@ func NewOTelManager(
mode ExecutionMode,
agentInfo info.Agent,
beatMonitoringConfigGetter translate.BeatMonitoringConfigGetter,
stopTimeout time.Duration,
) (*OTelManager, error) {
var exec collectorExecution
var recoveryTimer collectorRecoveryTimer
Expand All @@ -131,7 +138,7 @@ func NewOTelManager(
recoveryTimer = newRestarterNoop()
exec = newExecutionEmbedded()
default:
return nil, errors.New("unknown otel collector exec")
return nil, fmt.Errorf("unknown otel collector execution mode: %q", mode)
}

logger.Debugf("Using collector execution mode: %s", mode)
Expand All @@ -144,10 +151,12 @@ func NewOTelManager(
errCh: make(chan error, 1), // holds at most one error
collectorStatusCh: make(chan *status.AggregateStatus, 1),
componentStateCh: make(chan []runtime.ComponentComponentState, 1),
updateCh: make(chan configUpdate),
updateCh: make(chan configUpdate, 1),
doneChan: make(chan struct{}),
execution: exec,
recoveryTimer: recoveryTimer,
collectorRunErr: make(chan error),
stopTimeout: stopTimeout,
}, nil
}

Expand All @@ -156,24 +165,21 @@ func (m *OTelManager) Run(ctx context.Context) error {
var err error
m.proc = nil

// signal that the run loop is ended to unblock any incoming update calls
defer close(m.doneChan)

// collectorRunErr is used to signal that the collector has exited.
collectorRunErr := make(chan error)

// collectorStatusCh is used internally by the otel collector to send status updates to the manager
// this channel is buffered because it's possible for the collector to send a status update while the manager is
// waiting for the collector to exit
collectorStatusCh := make(chan *status.AggregateStatus, 1)
for {
select {
case <-ctx.Done():
// signal that the run loop is ended to unblock any incoming update calls
close(m.doneChan)

m.recoveryTimer.Stop()
// our caller context is cancelled so stop the collector and return
// has exited.
if m.proc != nil {
m.proc.Stop(ctx)
m.proc.Stop(m.stopTimeout)
}
return ctx.Err()
case <-m.recoveryTimer.C():
Expand All @@ -187,7 +193,7 @@ func (m *OTelManager) Run(ctx context.Context) error {

newRetries := m.recoveryRetries.Add(1)
m.logger.Infof("collector recovery restarting, total retries: %d", newRetries)
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh)
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh)
if err != nil {
reportErr(ctx, m.errCh, err)
// reset the restart timer to the next backoff
Expand All @@ -197,12 +203,12 @@ func (m *OTelManager) Run(ctx context.Context) error {
reportErr(ctx, m.errCh, nil)
}

case err = <-collectorRunErr:
case err = <-m.collectorRunErr:
m.recoveryTimer.Stop()
if err == nil {
// err is nil means that the collector has exited cleanly without an error
if m.proc != nil {
m.proc.Stop(ctx)
m.proc.Stop(m.stopTimeout)
m.proc = nil
updateErr := m.reportOtelStatusUpdate(ctx, nil)
if updateErr != nil {
Expand All @@ -223,7 +229,7 @@ func (m *OTelManager) Run(ctx context.Context) error {

// in this rare case the collector stopped running but a configuration was
// provided and the collector stopped with a clean exit
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh)
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh)
if err != nil {
// failed to create the collector (this is different then
// it's failing to run). we do not retry creation on failure
Expand All @@ -245,7 +251,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
// in the case that the configuration is invalid there is no reason to
// try again as it will keep failing so we do not trigger a restart
if m.proc != nil {
m.proc.Stop(ctx)
m.proc.Stop(m.stopTimeout)
m.proc = nil
// don't wait here for <-collectorRunErr, already occurred
// clear status, no longer running
Expand Down Expand Up @@ -281,7 +287,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
m.components = cfgUpdate.components
m.mx.Unlock()

err = m.applyMergedConfig(ctx, collectorStatusCh, collectorRunErr)
err = m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr)
// report the error unconditionally to indicate that the config was applied
reportErr(ctx, m.errCh, err)

Expand Down Expand Up @@ -340,7 +346,7 @@ func buildMergedConfig(cfgUpdate configUpdate, agentInfo info.Agent, monitoringC

func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error) error {
if m.proc != nil {
m.proc.Stop(ctx)
m.proc.Stop(m.stopTimeout)
m.proc = nil
select {
case <-collectorRunErr:
Expand Down Expand Up @@ -402,6 +408,15 @@ func (m *OTelManager) Update(cfg *confmap.Conf, components []component.Component
collectorCfg: cfg,
components: components,
}

// we care only about the latest config update
select {
case <-m.updateCh:
case <-m.doneChan:
return
default:
}

select {
case m.updateCh <- cfgUpdate:
case <-m.doneChan:
Expand Down
Loading
Loading