Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/pkg/otel/manager/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package manager

import (
"context"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"go.opentelemetry.io/collector/confmap"
Expand All @@ -18,5 +19,5 @@ type collectorExecution interface {
}

type collectorHandle interface {
Stop(ctx context.Context)
Stop(waitTime time.Duration)
}
6 changes: 4 additions & 2 deletions internal/pkg/otel/manager/execution_embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package manager

import (
"context"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"go.opentelemetry.io/collector/confmap"
Expand Down Expand Up @@ -66,15 +67,16 @@ type ctxHandle struct {
}

// Stop stops the collector
func (s *ctxHandle) Stop(ctx context.Context) {
func (s *ctxHandle) Stop(waitTime time.Duration) {
if s.cancel == nil {
return
}

s.cancel()

select {
case <-ctx.Done():
case <-time.After(waitTime):
return
case <-s.collectorDoneCh:
}
}
22 changes: 11 additions & 11 deletions internal/pkg/otel/manager/execution_subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
)

const (
processKillAfter = 5 * time.Second

OtelSetSupervisedFlagName = "supervised"
OtelSupervisedLoggingLevelFlagName = "supervised.logging.level"
)
Expand All @@ -56,6 +54,7 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string) (*subproc
},
logLevel: logLevel,
healthCheckExtensionID: healthCheckExtensionID,
reportErrFn: reportErr,
}, nil
}

Expand All @@ -64,6 +63,7 @@ type subprocessExecution struct {
collectorArgs []string
logLevel logp.Level
healthCheckExtensionID string
reportErrFn func(ctx context.Context, errCh chan error, err error) // required for testing
}

// startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the
Expand Down Expand Up @@ -106,7 +106,6 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
procCtx, procCtxCancel := context.WithCancel(ctx)
processInfo, err := process.Start(r.collectorPath,
process.WithArgs(r.collectorArgs),
process.WithContext(procCtx),
process.WithEnv(os.Environ()),
process.WithCmdOptions(func(c *exec.Cmd) error {
c.Stdin = bytes.NewReader(confBytes)
Expand All @@ -130,6 +129,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
ctl := &procHandle{
processDoneCh: make(chan struct{}),
processInfo: processInfo,
log: logger,
}

healthCheckDone := make(chan struct{})
Expand Down Expand Up @@ -196,14 +196,14 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
if procErr == nil {
if procState.Success() {
// report nil error so that the caller can be notified that the process has exited without error
reportErr(ctx, processErrCh, nil)
r.reportErrFn(ctx, processErrCh, nil)
} else {
reportErr(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String()))
r.reportErrFn(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String()))
}
return
}

reportErr(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr))
r.reportErrFn(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr))
}()

return ctl, nil
Expand All @@ -212,11 +212,12 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
type procHandle struct {
processDoneCh chan struct{}
processInfo *process.Info
log *logger.Logger
}

// Stop stops the process. If the process is already stopped, it does nothing. If the process does not stop within
// processKillAfter or due to an error, it will be killed.
func (s *procHandle) Stop(ctx context.Context) {
func (s *procHandle) Stop(waitTime time.Duration) {
select {
case <-s.processDoneCh:
// process has already exited
Expand All @@ -225,19 +226,18 @@ func (s *procHandle) Stop(ctx context.Context) {
}

if err := s.processInfo.Stop(); err != nil {
s.log.Warnf("failed to send stop signal to the supervised collector: %v", err)
// we failed to stop the process just kill it and return
_ = s.processInfo.Kill()
return
}

select {
case <-ctx.Done():
case <-time.After(waitTime):
s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String())
// our caller ctx is Done; kill the process just in case
_ = s.processInfo.Kill()
case <-s.processDoneCh:
// process has already exited
case <-time.After(processKillAfter):
// process is still running kill it
_ = s.processInfo.Kill()
}
}
44 changes: 28 additions & 16 deletions internal/pkg/otel/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type ExecutionMode string
const (
SubprocessExecutionMode ExecutionMode = "subprocess"
EmbeddedExecutionMode ExecutionMode = "embedded"
// waitTimeForStop is the time to wait for the collector to stop before killing it.
waitTimeForStop = 30 * time.Second
Comment thread
cmacknz marked this conversation as resolved.
Outdated
)

type collectorRecoveryTimer interface {
Expand Down Expand Up @@ -101,6 +103,9 @@ type OTelManager struct {
execution collectorExecution

proc collectorHandle

// collectorRunErr is used to signal that the collector has exited.
collectorRunErr chan error
}

// NewOTelManager returns a OTelManager.
Expand Down Expand Up @@ -131,7 +136,7 @@ func NewOTelManager(
recoveryTimer = newRestarterNoop()
exec = newExecutionEmbedded()
default:
return nil, errors.New("unknown otel collector exec")
return nil, fmt.Errorf("unknown otel collector execution mode: %q", mode)
}

logger.Debugf("Using collector execution mode: %s", mode)
Expand All @@ -144,10 +149,11 @@ func NewOTelManager(
errCh: make(chan error, 1), // holds at most one error
collectorStatusCh: make(chan *status.AggregateStatus, 1),
componentStateCh: make(chan []runtime.ComponentComponentState, 1),
updateCh: make(chan configUpdate),
updateCh: make(chan configUpdate, 1),
doneChan: make(chan struct{}),
execution: exec,
recoveryTimer: recoveryTimer,
collectorRunErr: make(chan error),
}, nil
}

Expand All @@ -156,24 +162,21 @@ func (m *OTelManager) Run(ctx context.Context) error {
var err error
m.proc = nil

// signal that the run loop is ended to unblock any incoming update calls
defer close(m.doneChan)

// collectorRunErr is used to signal that the collector has exited.
collectorRunErr := make(chan error)

// collectorStatusCh is used internally by the otel collector to send status updates to the manager
// this channel is buffered because it's possible for the collector to send a status update while the manager is
// waiting for the collector to exit
collectorStatusCh := make(chan *status.AggregateStatus, 1)
for {
select {
case <-ctx.Done():
// signal that the run loop is ended to unblock any incoming update calls
close(m.doneChan)

m.recoveryTimer.Stop()
// our caller context is cancelled so stop the collector and return
// has exited.
if m.proc != nil {
m.proc.Stop(ctx)
m.proc.Stop(waitTimeForStop)
}
return ctx.Err()
case <-m.recoveryTimer.C():
Expand All @@ -187,7 +190,7 @@ func (m *OTelManager) Run(ctx context.Context) error {

newRetries := m.recoveryRetries.Add(1)
m.logger.Infof("collector recovery restarting, total retries: %d", newRetries)
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh)
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh)
if err != nil {
reportErr(ctx, m.errCh, err)
// reset the restart timer to the next backoff
Expand All @@ -197,12 +200,12 @@ func (m *OTelManager) Run(ctx context.Context) error {
reportErr(ctx, m.errCh, nil)
}

case err = <-collectorRunErr:
case err = <-m.collectorRunErr:
m.recoveryTimer.Stop()
if err == nil {
// err is nil means that the collector has exited cleanly without an error
if m.proc != nil {
m.proc.Stop(ctx)
m.proc.Stop(waitTimeForStop)
m.proc = nil
updateErr := m.reportOtelStatusUpdate(ctx, nil)
if updateErr != nil {
Expand All @@ -223,7 +226,7 @@ func (m *OTelManager) Run(ctx context.Context) error {

// in this rare case the collector stopped running but a configuration was
// provided and the collector stopped with a clean exit
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh)
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh)
if err != nil {
// failed to create the collector (this is different then
// it's failing to run). we do not retry creation on failure
Expand All @@ -245,7 +248,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
// in the case that the configuration is invalid there is no reason to
// try again as it will keep failing so we do not trigger a restart
if m.proc != nil {
m.proc.Stop(ctx)
m.proc.Stop(waitTimeForStop)
m.proc = nil
// don't wait here for <-collectorRunErr, already occurred
// clear status, no longer running
Expand Down Expand Up @@ -281,7 +284,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
m.components = cfgUpdate.components
m.mx.Unlock()

err = m.applyMergedConfig(ctx, collectorStatusCh, collectorRunErr)
err = m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr)
// report the error unconditionally to indicate that the config was applied
reportErr(ctx, m.errCh, err)

Expand Down Expand Up @@ -340,7 +343,7 @@ func buildMergedConfig(cfgUpdate configUpdate, agentInfo info.Agent, monitoringC

func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error) error {
if m.proc != nil {
m.proc.Stop(ctx)
m.proc.Stop(waitTimeForStop)
m.proc = nil
select {
case <-collectorRunErr:
Expand Down Expand Up @@ -402,6 +405,15 @@ func (m *OTelManager) Update(cfg *confmap.Conf, components []component.Component
collectorCfg: cfg,
components: components,
}

// we care only about the latest config update
select {
Comment thread
swiatekm marked this conversation as resolved.
case <-m.updateCh:
case <-m.doneChan:
return
default:
}
Comment thread
pchila marked this conversation as resolved.

select {
case m.updateCh <- cfgUpdate:
case <-m.doneChan:
Expand Down
Loading
Loading