diff --git a/changelog/fragments/1766556100-reload-log-level.yaml b/changelog/fragments/1766556100-reload-log-level.yaml new file mode 100644 index 00000000000..9f2e86b3551 --- /dev/null +++ b/changelog/fragments/1766556100-reload-log-level.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: Fix reloading agent.logging.level for standalone Elastic Agent + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +# pr: https://github.com/owner/repo/1234 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +# issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index a83e5d6ca94..c16738dc0b0 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -240,7 +240,22 @@ func New( return nil, nil, nil, errors.New(err, "failed to initialize composable controller") } +<<<<<<< HEAD otelManager := otelmanager.NewOTelManager(log.Named("otel_manager"), baseLogger) +======= + otelManager, err := otelmanager.NewOTelManager( + log.Named("otel_manager"), + logLevel, + baseLogger, + agentInfo, + cfg.Settings.Collector, + monitor.ComponentMonitoringConfig, + otelmanager.CollectorStopTimeout, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err) + } +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, otelManager, actionAcker, initialUpgradeDetails, compModifiers...) if managed != nil { // the coordinator requires the config manager as well as in managed-mode the config manager requires the diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index c8ec393e004..ae2a21a4a9d 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -139,8 +139,13 @@ type RuntimeManager interface { type OTelManager interface { Runner +<<<<<<< HEAD // Update updates the current configuration for OTel. Update(cfg *confmap.Conf) +======= + // Update updates the current plain configuration for the otel collector and components. + Update(*confmap.Conf, *monitoringCfg.MonitoringConfig, logp.Level, []component.Component) +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) // Watch returns the chanel to watch for configuration changes. Watch() <-chan *status.AggregateStatus @@ -1458,10 +1463,20 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config // c.setProtection(protectionConfig) - if c.vars != nil { - return c.refreshComponentModel(ctx) + // check if log level has changed for standalone elastic-agent + // we'd have to update both the periodic and once config watchers and refactor initialization in application.go to do otherwise. + if c.agentInfo.IsStandalone() { + ll := currentCfg.Settings.LoggingConfig.Level + if ll != c.state.LogLevel { + // set log level for the coordinator + c.setLogLevel(ll) + // set global log level + logger.SetLevel(ll) + c.logger.Infof("log level changed to %s", ll.String()) + } } - return nil + + return c.refreshComponentModel(ctx) } // Generate the AST for a new incoming configuration and, if successful, @@ -1574,10 +1589,13 @@ func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) // Called on the main Coordinator goroutine. func (c *Coordinator) processLogLevel(ctx context.Context, ll logp.Level) { - c.setLogLevel(ll) - err := c.refreshComponentModel(ctx) - if err != nil { - c.logger.Errorf("updating log level: %s", err.Error()) + // do not refresh component model if log level did not change + if c.state.LogLevel != ll { + c.setLogLevel(ll) + err := c.refreshComponentModel(ctx) + if err != nil { + c.logger.Errorf("updating log level: %s", err.Error()) + } } } @@ -1650,6 +1668,7 @@ func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error { } c.logger.With("component_ids", componentIDs).Warn("The Otel runtime manager is HIGHLY EXPERIMENTAL and only intended for testing. Use at your own risk.") } +<<<<<<< HEAD if componentOtelCfg != nil { err := finalOtelCfg.Merge(componentOtelCfg) if err != nil { @@ -1672,6 +1691,9 @@ func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error { c.otelMgr.Update(finalOtelCfg) c.finalOtelCfg = finalOtelCfg return nil +======= + c.otelMgr.Update(c.otelCfg, c.currentCfg.Settings.MonitoringConfig, c.state.LogLevel, otelModel.Components) +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) } // splitModelBetweenManager splits the model components between the runtime manager and the otel manager. diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 59cbc25cace..a4173973c06 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -1396,6 +1396,44 @@ func (f *fakeOTelManager) Run(ctx context.Context) error { } func (f *fakeOTelManager) Errors() <-chan error { +<<<<<<< HEAD +======= + return f.errChan +} + +func (f *fakeOTelManager) Update(cfg *confmap.Conf, monitoring *monitoringCfg.MonitoringConfig, ll logp.Level, components []component.Component) { + var collectorResult, componentResult error + if f.updateCollectorCallback != nil { + collectorResult = f.updateCollectorCallback(cfg) + } + if f.errChan != nil && collectorResult != nil { + // If a reporting channel is set, send the collectorResult to it + f.errChan <- collectorResult + } + if f.updateComponentCallback != nil { + componentResult = f.updateComponentCallback(components) + } + if f.errChan != nil && componentResult != nil { + // If a reporting channel is set, send the componentResult to it + f.errChan <- componentResult + } +} + +func (f *fakeOTelManager) WatchCollector() <-chan *status.AggregateStatus { + return f.collectorStatusChan +} + +func (f *fakeOTelManager) WatchComponents() <-chan []runtime.ComponentComponentState { + return f.componentStateChan +} + +func (f *fakeOTelManager) MergedOtelConfig() *confmap.Conf { return nil } + +func (f *fakeOTelManager) PerformDiagnostics(ctx context.Context, reqs ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic { + if f.performDiagnosticsCallback != nil { + return f.performDiagnosticsCallback(ctx, reqs...) + } +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) return nil } diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 73ee7251484..63386e031fc 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -452,11 +452,18 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) { } }() +<<<<<<< HEAD upgradeMgr, err := upgrade.NewUpgrader( log, &artifact.Config{}, &info.AgentInfo{}, ) +======= + tmpDir := t.TempDir() + agentInfo, err := info.NewAgentInfo(ctx, false) + require.NoError(t, err) + upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, agentInfo, new(upgrade.AgentWatcherHelper), ttl.NewTTLMarkerRegistry(nil, tmpDir)) +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) require.NoError(t, err, "errored when creating a new upgrader") // Channels have buffer length 1, so we don't have to run on multiple @@ -488,6 +495,7 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) { ast: emptyAST(t), componentPIDTicker: time.NewTicker(time.Second * 30), secretMarkerFunc: testSecretMarkerFunc, + agentInfo: agentInfo, } // Send an invalid config update and confirm that Coordinator reports @@ -580,6 +588,9 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) { defer cancel() logger := logp.NewLogger("testing") + agentInfo, err := info.NewAgentInfo(t.Context(), false) + require.NoError(t, err) + // Channels have buffer length 1 so we don't have to run on multiple // goroutines. stateChan := make(chan State, 1) @@ -605,6 +616,7 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) { ast: emptyAST(t), componentPIDTicker: time.NewTicker(time.Second * 30), secretMarkerFunc: testSecretMarkerFunc, + agentInfo: agentInfo, } // This configuration produces a valid AST but its EQL condition is diff --git a/internal/pkg/agent/application/periodic.go b/internal/pkg/agent/application/periodic.go index b03b6496278..8aec71494dc 100644 --- a/internal/pkg/agent/application/periodic.go +++ b/internal/pkg/agent/application/periodic.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" ) +// periodic checks for local configuration changes type periodic struct { log *logger.Logger period time.Duration @@ -147,6 +148,7 @@ func newPeriodic( } } +// localConfigChange implements coordinator.ConfigChange for local file changes. type localConfigChange struct { cfg *config.Config } diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 7fc0009b8fe..3c8eaf650fd 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -302,19 +302,17 @@ func runElasticAgent( errors.M(errors.MetaKeyPath, pathConfigFile)) } + // Set the initial log level (either default or from config file) + logger.SetLevel(logLvl) + // Ensure that the log level now matches what is configured in the agentInfo. - if agentInfo.LogLevel() != "" { - var lvl logp.Level - err = lvl.Unpack(agentInfo.LogLevel()) - if err != nil { - l.Error(errors.New(err, "failed to parse agent information log level")) - } else { - logLvl = lvl - logger.SetLevel(lvl) - } + var lvl logp.Level + err = lvl.Unpack(agentInfo.LogLevel()) + if err != nil { + l.Error(errors.New(err, "failed to parse agent information log level")) } else { - // Set the initial log level (either default or from config file) - logger.SetLevel(logLvl) + logLvl = lvl + logger.SetLevel(lvl) } // initiate agent watcher diff --git a/internal/pkg/otel/manager/execution.go b/internal/pkg/otel/manager/execution.go new file mode 100644 index 00000000000..f3d425b1685 --- /dev/null +++ b/internal/pkg/otel/manager/execution.go @@ -0,0 +1,36 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "context" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "go.opentelemetry.io/collector/confmap" + + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +type collectorExecution interface { + // startCollector starts the otel collector with the given arguments, returning a handle allowing it to be stopped. + // Cancelling the context will stop all goroutines involved in the execution. + // The collector will report status events in the statusCh channel and errors on errCh in a non-blocking fashion, + // draining the channel before writing to it. + // After the collector exits, it will emit an error describing the exit status (nil if successful) and a nil status. + // Parameters: + // - cfg: Configuration for the collector. + // - errCh: Process exit errors are sent to the errCh channel + // - statusCh: Collector's status updates are sent to statusCh channel. + // - forceFetchStatusCh: Channel that is used to trigger a forced status update. + startCollector(ctx context.Context, logLevel string, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) +} + +type collectorHandle interface { + // Stop stops and waits for collector to exit gracefully within the given duration. Note that if the collector + // doesn't exit within that time, it will be killed and then it will wait an extra second for it to ensure it's + // really stopped. + Stop(waitTime time.Duration) +} diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go new file mode 100644 index 00000000000..09a15e4841f --- /dev/null +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -0,0 +1,397 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "os/exec" + "sync" + "time" + + "go.opentelemetry.io/collector/component" + "gopkg.in/yaml.v3" + + componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/confmap" + "go.uber.org/zap/zapcore" + + "github.com/elastic/elastic-agent-libs/logp" + + "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring" + runtimeLogger "github.com/elastic/elastic-agent/pkg/component/runtime" + "github.com/elastic/elastic-agent/pkg/core/logger" + "github.com/elastic/elastic-agent/pkg/core/process" +) + +const ( + OtelSetSupervisedFlagName = "supervised" + OtelSupervisedLoggingLevelFlagName = "supervised.logging.level" + OtelSupervisedMonitoringURLFlagName = "supervised.monitoring.url" +) + +// newSubprocessExecution creates a new execution which runs the otel collector in a subprocess. A metricsPort or +// healthCheckPort of 0 will result in a random port being used. +func newSubprocessExecution(collectorPath string, uuid string, metricsPort int, healthCheckPort int) (*subprocessExecution, error) { + componentType, err := component.NewType(healthCheckExtensionName) + if err != nil { + return nil, fmt.Errorf("cannot create component type: %w", err) + } + healthCheckExtensionID := component.NewIDWithName(componentType, uuid).String() + + return &subprocessExecution{ + collectorPath: collectorPath, + collectorArgs: []string{ + fmt.Sprintf("--%s", OtelSetSupervisedFlagName), + fmt.Sprintf("--%s=%s", OtelSupervisedMonitoringURLFlagName, monitoring.EDOTMonitoringEndpoint()), + }, + healthCheckExtensionID: healthCheckExtensionID, + collectorMetricsPort: metricsPort, + collectorHealthCheckPort: healthCheckPort, + reportErrFn: reportErr, + }, nil +} + +// subprocessExecution implements collectorExecution by running the collector in a subprocess. +type subprocessExecution struct { + collectorPath string + collectorArgs []string + healthCheckExtensionID string + collectorMetricsPort int + collectorHealthCheckPort int + reportErrFn func(ctx context.Context, errCh chan error, err error) // required for testing +} + +// startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the +// processErrCh channel. Other run errors, such as not able to connect to the health endpoint, are sent to the runErrCh channel. +func (r *subprocessExecution) startCollector( + ctx context.Context, + logLevel string, + baseLogger *logger.Logger, + logger *logger.Logger, + cfg *confmap.Conf, + processErrCh chan error, + statusCh chan *status.AggregateStatus, + forceFetchStatusCh chan struct{}, +) (collectorHandle, error) { + var lvl logp.Level + err := lvl.Unpack(logLevel) + if err != nil { + return nil, fmt.Errorf("failed to unpack the log level '%s': %w", logLevel, err) + } + + if cfg == nil { + // configuration is required + return nil, errors.New("no configuration provided") + } + + if r.collectorPath == "" { + // collector path is required + return nil, errors.New("no collector path provided") + } + + if _, err := os.Stat(r.collectorPath); err != nil { + // we cannot access the collector path + return nil, fmt.Errorf("cannot access collector path: %w", err) + } + + httpHealthCheckPort, collectorMetricsPort, err := r.getCollectorPorts() + if err != nil { + return nil, fmt.Errorf("could not find port for collector: %w", err) + } + + if err := injectHealthCheckV2Extension(cfg, r.healthCheckExtensionID, httpHealthCheckPort); err != nil { + return nil, fmt.Errorf("failed to inject health check extension: %w", err) + } + + confMap := cfg.ToStringMap() + confBytes, err := yaml.Marshal(confMap) + if err != nil { + return nil, fmt.Errorf("failed to marshal config to yaml: %w", err) + } + + stdOutLast := newZapLast(baseLogger.Core()) + stdOut := runtimeLogger.NewLogWriterWithDefaults(stdOutLast, zapcore.Level(lvl)) + // info level for stdErr because by default collector writes to stderr + stdErrLast := newZapLast(baseLogger.Core()) + stdErr := runtimeLogger.NewLogWriterWithDefaults(stdErrLast, zapcore.Level(lvl)) + + procCtx, procCtxCancel := context.WithCancel(ctx) + env := os.Environ() + // Set the environment variable for the collector metrics port. See comment at the constant definition for more information. + env = append(env, fmt.Sprintf("%s=%d", componentmonitoring.OtelCollectorMetricsPortEnvVarName, collectorMetricsPort)) + + // set collector args + collectorArgs := append(r.collectorArgs, fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, lvl)) + + processInfo, err := process.Start(r.collectorPath, + process.WithArgs(collectorArgs), + process.WithEnv(env), + process.WithCmdOptions(func(c *exec.Cmd) error { + c.Stdin = bytes.NewReader(confBytes) + c.Stdout = stdOut + c.Stderr = stdErr + return nil + }), + ) + if err != nil { + // we failed to start the process + procCtxCancel() + return nil, fmt.Errorf("failed to start supervised collector: %w", err) + } + logger.Infof("supervised collector started with pid: %d and healthcheck port: %d", processInfo.Process.Pid, httpHealthCheckPort) + if processInfo.Process == nil { + // this should not happen but just in case + procCtxCancel() + return nil, fmt.Errorf("failed to start supervised collector: process is nil") + } + + ctl := &procHandle{ + processDoneCh: make(chan struct{}), + processInfo: processInfo, + log: logger, + } + + healthCheckDone := make(chan struct{}) + go func() { + defer func() { + close(healthCheckDone) + }() + currentStatus := aggregateStatus(componentstatus.StatusStarting, nil) + r.reportSubprocessCollectorStatus(ctx, statusCh, currentStatus) + + // specify a max duration of not being able to get the status from the collector + const maxFailuresDuration = 130 * time.Second + maxFailuresTimer := time.NewTimer(maxFailuresDuration) + defer maxFailuresTimer.Stop() + + // check the health of the collector every 1 second + const healthCheckPollDuration = 1 * time.Second + healthCheckPollTimer := time.NewTimer(healthCheckPollDuration) + defer healthCheckPollTimer.Stop() + for { + statuses, err := AllComponentsStatuses(procCtx, httpHealthCheckPort) + if err != nil { + switch { + case errors.Is(err, context.Canceled): + // after the collector exits, we need to report a nil status + r.reportSubprocessCollectorStatus(ctx, statusCh, nil) + return + default: + // if we face any other error (most likely, connection refused), log the error. + logger.Debugf("Received an unexpected error while fetching component status: %v", err) + } + } else { + maxFailuresTimer.Reset(maxFailuresDuration) + removeManagedHealthCheckExtensionStatus(statuses, r.healthCheckExtensionID) + if !compareStatuses(currentStatus, statuses) { + currentStatus = statuses + r.reportSubprocessCollectorStatus(procCtx, statusCh, statuses) + } + } + + select { + case <-procCtx.Done(): + // after the collector exits, we need to report a nil status + r.reportSubprocessCollectorStatus(ctx, statusCh, nil) + return + case <-forceFetchStatusCh: + r.reportSubprocessCollectorStatus(procCtx, statusCh, statuses) + case <-healthCheckPollTimer.C: + healthCheckPollTimer.Reset(healthCheckPollDuration) + case <-maxFailuresTimer.C: + failedToConnectStatuses := aggregateStatus( + componentstatus.StatusRecoverableError, + errors.New("failed to connect to collector"), + ) + if !compareStatuses(currentStatus, failedToConnectStatuses) { + currentStatus = statuses + r.reportSubprocessCollectorStatus(procCtx, statusCh, statuses) + } + } + } + }() + + go func() { + procState, procErr := processInfo.Process.Wait() + logger.Debugf("wait for pid %d returned", processInfo.PID) + procCtxCancel() + <-healthCheckDone + close(ctl.processDoneCh) + // using ctx instead of procCtx in the reportErr functions below is intentional. This allows us to report + // errors to the caller through processErrCh and essentially discard any other errors that occurred because + // the process exited. + if procErr == nil { + if procState.Success() { + // report nil error so that the caller can be notified that the process has exited without error + r.reportErrFn(ctx, processErrCh, nil) + } else { + var procReportErr error + stderrMsg := stdErrLast.Last().Message + stdoutMsg := stdOutLast.Last().Message + if stderrMsg != "" { + // use stderr message as the error + procReportErr = errors.New(stderrMsg) + } else if stdoutMsg != "" { + // use last stdout message as the error + procReportErr = errors.New(stdoutMsg) + } else { + // neither case use standard process error + procReportErr = fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String()) + } + r.reportErrFn(ctx, processErrCh, procReportErr) + } + return + } + + r.reportErrFn(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr)) + }() + + return ctl, nil +} + +// cloneCollectorStatus creates a deep copy of the provided AggregateStatus. +func cloneCollectorStatus(aStatus *status.AggregateStatus) *status.AggregateStatus { + if aStatus == nil { + return nil + } + + st := &status.AggregateStatus{ + Event: aStatus.Event, + } + + if len(aStatus.ComponentStatusMap) > 0 { + st.ComponentStatusMap = make(map[string]*status.AggregateStatus, len(aStatus.ComponentStatusMap)) + for k, cs := range aStatus.ComponentStatusMap { + st.ComponentStatusMap[k] = cloneCollectorStatus(cs) + } + } + + return st +} + +func (r *subprocessExecution) reportSubprocessCollectorStatus(ctx context.Context, statusCh chan *status.AggregateStatus, collectorStatus *status.AggregateStatus) { + // we need to clone the status to prevent any mutation on the receiver side + // affecting the original ref + clonedStatus := cloneCollectorStatus(collectorStatus) + reportCollectorStatus(ctx, statusCh, clonedStatus) +} + +// getCollectorPorts returns the ports used by the OTel collector. If the ports set in the execution struct are 0, +// random ports are returned instead. +func (r *subprocessExecution) getCollectorPorts() (healthCheckPort int, metricsPort int, err error) { + randomPorts := make([]*int, 0, 2) + // if the ports are defined (non-zero), use them + if r.collectorMetricsPort == 0 { + randomPorts = append(randomPorts, &metricsPort) + } else { + metricsPort = r.collectorMetricsPort + } + if r.collectorHealthCheckPort == 0 { + randomPorts = append(randomPorts, &healthCheckPort) + } else { + healthCheckPort = r.collectorHealthCheckPort + } + + if len(randomPorts) == 0 { + return healthCheckPort, metricsPort, nil + } + + // we need at least one random port, create it + ports, err := findRandomTCPPorts(len(randomPorts)) + if err != nil { + return 0, 0, err + } + for i, port := range ports { + *randomPorts[i] = port + } + return healthCheckPort, metricsPort, nil +} + +func removeManagedHealthCheckExtensionStatus(status *status.AggregateStatus, healthCheckExtensionID string) { + extensions, exists := status.ComponentStatusMap["extensions"] + if !exists { + return + } + + extensionID := "extension:" + healthCheckExtensionID + delete(extensions.ComponentStatusMap, extensionID) +} + +type procHandle struct { + processDoneCh chan struct{} + processInfo *process.Info + log *logger.Logger +} + +// Stop stops the process. If the process is already stopped, it does nothing. If the process does not stop within +// processKillAfter or due to an error, it will be killed. +func (s *procHandle) Stop(waitTime time.Duration) { + select { + case <-s.processDoneCh: + // process has already exited + return + default: + } + + s.log.Debugf("gracefully stopping pid %d", s.processInfo.PID) + if err := s.processInfo.Stop(); err != nil { + s.log.Warnf("failed to send stop signal to the supervised collector: %v", err) + // we failed to stop the process just kill it and return + } else { + select { + case <-time.After(waitTime): + s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String()) + case <-s.processDoneCh: + // process has already exited + return + } + } + + // since we are here this means that the process either got an error at stop or did not stop within the timeout, + // kill it and give one more mere second for the process wait to be called + _ = s.processInfo.Kill() + select { + case <-time.After(1 * time.Second): + s.log.Warnf("supervised collector subprocess didn't exit in time after killing it") + case <-s.processDoneCh: + } +} + +type zapWriter interface { + Write(zapcore.Entry, []zapcore.Field) error +} +type zapLast struct { + wrapped zapWriter + last zapcore.Entry + mx sync.Mutex +} + +func newZapLast(w zapWriter) *zapLast { + return &zapLast{ + wrapped: w, + } +} + +// Write stores the most recent log entry. +func (z *zapLast) Write(entry zapcore.Entry, fields []zapcore.Field) error { + z.mx.Lock() + z.last = entry + z.mx.Unlock() + return z.wrapped.Write(entry, fields) +} + +// Last returns the last log entry. +func (z *zapLast) Last() zapcore.Entry { + z.mx.Lock() + defer z.mx.Unlock() + return z.last +} diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index a867b0fc993..1a0ab7831bb 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -20,6 +20,35 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" ) +<<<<<<< HEAD +======= +const ( + // CollectorStopTimeout is the duration to wait for the collector to stop. Note: this needs to be shorter + // than 5 * time.Second (coordinator.managerShutdownTimeout) otherwise we might end up with a defunct process. + CollectorStopTimeout = 3 * time.Second +) + +type collectorRecoveryTimer interface { + // IsStopped returns true if the timer is stopped + IsStopped() bool + // Stop stops the timer + Stop() + // ResetInitial resets the timer to the initial interval + ResetInitial() time.Duration + // ResetNext resets the timer to the next interval + ResetNext() time.Duration + // C returns the timer channel + C() <-chan time.Time +} + +type configUpdate struct { + collectorCfg *confmap.Conf + monitoringCfg *monitoringCfg.MonitoringConfig + components []component.Component + logLevel logp.Level +} + +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) // OTelManager is a manager that manages the lifecycle of the OTel collector inside of the Elastic Agent. type OTelManager struct { // baseLogger is the base logger for the otel collector, and doesn't include any agent-specific fields. @@ -41,6 +70,7 @@ type OTelManager struct { // doneChan is closed when Run is stopped to signal that any // pending update calls should be ignored. doneChan chan struct{} +<<<<<<< HEAD } // NewOTelManager returns a OTelManager. @@ -53,6 +83,94 @@ func NewOTelManager(logger, baseLogger *logger.Logger) *OTelManager { statusCh: make(chan *status.AggregateStatus), doneChan: make(chan struct{}), } +======= + + // recoveryTimer is used to restart the collector when it has errored. + recoveryTimer collectorRecoveryTimer + + // recoveryRetries is the number of times the collector has been + // restarted through the recovery timer. + recoveryRetries atomic.Uint32 + + // execution is used to invoke the collector into different execution modes + execution collectorExecution + + proc collectorHandle + + // collectorRunErr is used to signal that the collector has exited. + collectorRunErr chan error + + // stopTimeout is the timeout to wait for the collector to stop. + stopTimeout time.Duration + + // log level of the collector + logLevel string +} + +// NewOTelManager returns a OTelManager. +func NewOTelManager( + logger *logger.Logger, + logLevel logp.Level, + baseLogger *logger.Logger, + agentInfo info.Agent, + agentCollectorConfig *configuration.CollectorConfig, + beatMonitoringConfigGetter translate.BeatMonitoringConfigGetter, + stopTimeout time.Duration, +) (*OTelManager, error) { + var exec collectorExecution + var recoveryTimer collectorRecoveryTimer + var err error + + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + hcUUIDStr := hcUUID.String() + + // determine the otel collector ports + collectorMetricsPort, collectorHealthCheckPort := 0, 0 + if agentCollectorConfig != nil { + if agentCollectorConfig.HealthCheckConfig.Endpoint != "" { + collectorHealthCheckPort, err = agentCollectorConfig.HealthCheckConfig.Port() + if err != nil { + return nil, fmt.Errorf("invalid collector health check port: %w", err) + } + } + if agentCollectorConfig.TelemetryConfig.Endpoint != "" { + collectorMetricsPort, err = agentCollectorConfig.TelemetryConfig.Port() + if err != nil { + return nil, fmt.Errorf("invalid collector metrics port: %w", err) + } + } + } + + executable := filepath.Join(paths.Components(), collectorBinaryName) + recoveryTimer = newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute) + exec, err = newSubprocessExecution(executable, hcUUIDStr, collectorMetricsPort, collectorHealthCheckPort) + if err != nil { + return nil, fmt.Errorf("failed to create subprocess execution: %w", err) + } + + return &OTelManager{ + logger: logger, + baseLogger: baseLogger, + agentInfo: agentInfo, + beatMonitoringConfigGetter: beatMonitoringConfigGetter, + healthCheckExtID: fmt.Sprintf("extension:healthcheckv2/%s", hcUUIDStr), + errCh: make(chan error, 1), // holds at most one error + collectorStatusCh: make(chan *status.AggregateStatus, 1), + // componentStateCh uses a buffer channel to ensure that no state transitions are missed and to prevent + // any possible case of deadlock, 5 is used just to give a small buffer. + componentStateCh: make(chan []runtime.ComponentComponentState, 5), + updateCh: make(chan configUpdate, 1), + doneChan: make(chan struct{}), + execution: exec, + recoveryTimer: recoveryTimer, + collectorRunErr: make(chan error), + stopTimeout: stopTimeout, + logLevel: logLevel.String(), + }, nil +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) } // Run runs the lifecycle of the manager. @@ -73,14 +191,46 @@ func (m *OTelManager) Run(ctx context.Context) error { <-runErrCh // wait for collector to be stopped } return ctx.Err() +<<<<<<< HEAD case err = <-runErrCh: +======= + case <-m.recoveryTimer.C(): + m.recoveryTimer.Stop() + + if m.mergedCollectorCfg == nil || m.proc != nil || ctx.Err() != nil { + // no configuration, or the collector is already running, or the context + // is cancelled. + continue + } + + // at this point no critical errors are occurring + // any issues starting the collector are reporting in the status + reportErr(ctx, m.errCh, nil) + + newRetries := m.recoveryRetries.Add(1) + m.logger.Infof("collector recovery restarting, total retries: %d", newRetries) + m.proc, err = m.execution.startCollector(ctx, m.logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) + if err != nil { + // report a startup error (this gets reported as status) + m.reportStartupErr(ctx, err) + // reset the restart timer to the next backoff + recoveryDelay := m.recoveryTimer.ResetNext() + m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) + } + case err = <-m.collectorRunErr: + m.recoveryTimer.Stop() +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) if err == nil { // err is nil but there is a configuration // // in this rare case the collector stopped running but a configuration was // provided and the collector stopped with a clean exit +<<<<<<< HEAD cancel() cancel, provider, err = m.startCollector(m.cfg, runErrCh) +======= + m.proc, err = m.execution.startCollector(ctx, m.logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) if err != nil { // failed to create the collector (this is different then // it's failing to run). we do not retry creation on failure @@ -115,6 +265,7 @@ func (m *OTelManager) Run(ctx context.Context) error { m.reportErr(ctx, err) } } +<<<<<<< HEAD case cfg := <-m.cfgCh: m.cfg = cfg if cfg == nil { @@ -133,6 +284,65 @@ func (m *OTelManager) Run(ctx context.Context) error { case m.statusCh <- nil: case <-ctx.Done(): } +======= + + case cfgUpdate := <-m.updateCh: + // we received a new configuration, thus stop the recovery timer + // and reset the retry count + m.recoveryTimer.Stop() + m.recoveryRetries.Store(0) + mergedCfg, err := buildMergedConfig(cfgUpdate, m.agentInfo, m.beatMonitoringConfigGetter, m.baseLogger) + if err != nil { + // critical error, merging the configuration should always work + reportErr(ctx, m.errCh, err) + continue + } + + // this is the only place where we mutate the internal config attributes, take a write lock for the duration + m.mx.Lock() + previousConfigHash := m.mergedCollectorCfgHash + configChanged, configUpdateErr := m.maybeUpdateMergedConfig(mergedCfg) + m.collectorCfg = cfgUpdate.collectorCfg + m.components = cfgUpdate.components + // set the log level defined in service::telemetry::log::level setting + if mergedCfg != nil && mergedCfg.IsSet("service::telemetry::logs::level") { + if logLevel, ok := mergedCfg.Get("service::telemetry::logs::level").(string); ok { + m.logLevel = logLevel + } else { + m.logger.Warn("failed to access log level from service::telemetry::logs::level") + } + } else { + // when mergedCfg is nil use coordinator's log level + m.logLevel = cfgUpdate.logLevel.String() + } + m.mx.Unlock() + + if configUpdateErr != nil { + m.logger.Warn("failed to calculate hash of merged config, proceeding with update", zap.Error(configUpdateErr)) + } + + if configChanged { + m.logger.Debugf( + "new config hash (%d) is different than the old config hash (%d), applying update", + m.mergedCollectorCfgHash, previousConfigHash) + applyErr := m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr, forceFetchStatusCh) + // only report the error if we actually apply the update + // otherwise, we could override an actual error with a nil in the channel when the collector + // state doesn't actually change + reportErr(ctx, m.errCh, applyErr) + } else { + m.logger.Debugf( + "new config hash (%d) is identical to the old config hash (%d), skipping update", + m.mergedCollectorCfgHash, previousConfigHash) + + // there was a config update, but the hash hasn't changed. + // Force fetch the latest collector status in case the user modified the output.status_reporting flag. + // + // drain the channel first + select { + case <-forceFetchStatusCh: + default: +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) } // ensure that the coordinator knows that there is no error // as the collector is not running anymore @@ -169,10 +379,240 @@ func (m *OTelManager) Errors() <-chan error { return m.errCh } +<<<<<<< HEAD // Update updates the configuration. // // When nil is passed for the cfg, then the collector is stopped. func (m *OTelManager) Update(cfg *confmap.Conf) { +======= +// buildMergedConfig combines collector configuration with component-derived configuration. +func buildMergedConfig( + cfgUpdate configUpdate, + agentInfo info.Agent, + monitoringConfigGetter translate.BeatMonitoringConfigGetter, + logger *logp.Logger, +) (*confmap.Conf, error) { + mergedOtelCfg := confmap.New() + + // Generate component otel config if there are components + var componentOtelCfg *confmap.Conf + if len(cfgUpdate.components) > 0 { + model := &component.Model{Components: cfgUpdate.components} + var err error + componentOtelCfg, err = translate.GetOtelConfig(model, agentInfo, monitoringConfigGetter, logger) + if err != nil { + return nil, fmt.Errorf("failed to generate otel config: %w", err) + } + + level := translate.GetOTelLogLevel(cfgUpdate.logLevel.String()) + if err := componentOtelCfg.Merge(confmap.NewFromStringMap(map[string]any{"service::telemetry::logs::level": level})); err != nil { + return nil, fmt.Errorf("failed to set log level in otel config: %w", err) + } + + } + + // If both configs are nil, return nil so the manager knows to stop the collector + if componentOtelCfg == nil && cfgUpdate.collectorCfg == nil { + return nil, nil + } + + // Merge component config if it exists + if componentOtelCfg != nil { + err := mergedOtelCfg.Merge(componentOtelCfg) + if err != nil { + return nil, fmt.Errorf("failed to merge component otel config: %w", err) + } + + if mCfg := cfgUpdate.monitoringCfg; mCfg != nil { + if mCfg.Enabled && mCfg.MonitorMetrics { + // Metrics monitoring is enabled, inject a receiver for the + // collector's internal telemetry. + err := injectMonitoringReceiver(mergedOtelCfg, mCfg, agentInfo) + if err != nil { + return nil, fmt.Errorf("merging internal telemetry config: %w", err) + } + } + } + } + + // Merge with base collector config if it exists + if cfgUpdate.collectorCfg != nil { + err := mergedOtelCfg.Merge(cfgUpdate.collectorCfg) + if err != nil { + return nil, fmt.Errorf("failed to merge collector otel config: %w", err) + } + } + + if err := addCollectorMetricsReader(mergedOtelCfg); err != nil { + return nil, fmt.Errorf("failed to add random collector metrics port: %w", err) + } + + if err := injectDiagnosticsExtension(mergedOtelCfg); err != nil { + return nil, fmt.Errorf("failed to inject diagnostics: %w", err) + } + + return mergedOtelCfg, nil +} + +func injectDiagnosticsExtension(config *confmap.Conf) error { + extensionCfg := map[string]any{ + "extensions": map[string]any{ + "elastic_diagnostics": map[string]any{ + "endpoint": paths.DiagnosticsExtensionSocket(), + }, + }, + } + if config.IsSet("service::extensions") { + extensionList := config.Get("service::extensions").([]interface{}) + if slices.Contains(extensionList, "elastic_diagnostics") { + // already configured, nothing to do + return nil + } + extensionList = append(extensionList, "elastic_diagnostics") + extensionCfg["service::extensions"] = extensionList + } + + return config.Merge(confmap.NewFromStringMap(extensionCfg)) +} + +func monitoringEventTemplate(monitoring *monitoringCfg.MonitoringConfig, agentInfo info.Agent) map[string]any { + namespace := "default" + if monitoring.Namespace != "" { + namespace = monitoring.Namespace + } + return map[string]any{ + "data_stream": map[string]any{ + "dataset": "elastic_agent.elastic_agent", + "namespace": namespace, + "type": "metrics", + }, + "event": map[string]any{ + "dataset": "elastic_agent.elastic_agent", + }, + "elastic_agent": map[string]any{ + "id": agentInfo.AgentID(), + "process": "elastic-agent", + "snapshot": agentInfo.Snapshot(), + "version": agentInfo.Version(), + }, + "agent": mapstr.M{ + "id": agentInfo.AgentID(), + }, + "component": mapstr.M{ + "binary": "elastic-agent", + "id": "elastic-agent/collector", + }, + "metricset": mapstr.M{ + "name": "stats", + }, + } +} + +func injectMonitoringReceiver( + config *confmap.Conf, + monitoring *monitoringCfg.MonitoringConfig, + agentInfo info.Agent, +) error { + receiverType := otelcomponent.MustNewType(elasticmonitoringreceiver.Name) + receiverName := "collector/internal-telemetry-monitoring" + receiverID := translate.GetReceiverID(receiverType, receiverName).String() + pipelineID := "logs/" + translate.OtelNamePrefix + receiverName + exporterType := otelcomponent.MustNewType("elasticsearch") + exporterID := translate.GetExporterID(exporterType, componentmonitoring.MonitoringOutput).String() + monitoringExporterFound := false + if config.IsSet("exporters") { + // Search the defined exporters for one with the expected id for monitoring + for exporter := range config.Get("exporters").(map[string]any) { + if exporter == exporterID { + monitoringExporterFound = true + } + } + } + if !monitoringExporterFound { + // We can't monitor OTel metrics without OTel-based monitoring + return nil + } + receiverCfg := map[string]any{ + "receivers": map[string]any{ + receiverID: map[string]any{ + "event_template": monitoringEventTemplate(monitoring, agentInfo), + "interval": monitoring.MetricsPeriod, + }, + }, + "service": map[string]any{ + "pipelines": map[string]any{ + pipelineID: map[string]any{ + "receivers": []string{receiverID}, + "exporters": []string{exporterID}, + }, + }, + }, + } + return config.Merge(confmap.NewFromStringMap(receiverCfg)) +} + +func (m *OTelManager) applyMergedConfig(ctx context.Context, + collectorStatusCh chan *status.AggregateStatus, + collectorRunErr chan error, + forceFetchStatusCh chan struct{}, +) error { + if m.proc != nil { + m.proc.Stop(m.stopTimeout) + m.proc = nil + // We wait here for the collector to exit before possibly starting a new one. The execution indicates this + // by sending an error over the appropriate channel. It will also send a nil status that we'll either process + // after exiting from this function and going back to the main loop, or it will be overridden by the status + // from the newly started collector. + // This is the only blocking wait inside the main loop involving channels, so we need to be extra careful not to + // deadlock. + // TODO: Verify if we need to wait for the error at all. Stop() is already blocking. + select { + case <-collectorRunErr: + case <-ctx.Done(): + // our caller ctx is Done + return ctx.Err() + } + } + + if m.mergedCollectorCfg == nil { + // no configuration then the collector should not be + // running. + // ensure that the coordinator knows that there is no error + // as the collector is not running anymore + return nil + } else { + // either a new configuration or the first configuration + // that results in the collector being started + proc, err := m.execution.startCollector(ctx, m.logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh) + if err != nil { + // failed to create the collector (this is different then + // it's failing to run). we do not retry creation on failure + // as it will always fail. A new configuration is required for + // it not to fail (a new configuration will result in the retry) + // since this is a new configuration we want to start the timer + // from the initial delay + recoveryDelay := m.recoveryTimer.ResetInitial() + m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) + return err + } else { + // all good at the moment (possible that it will fail) + m.proc = proc + } + } + return nil +} + +// Update sends collector configuration and component updates to the manager's run loop. +func (m *OTelManager) Update(cfg *confmap.Conf, monitoring *monitoringCfg.MonitoringConfig, ll logp.Level, components []component.Component) { + cfgUpdate := configUpdate{ + collectorCfg: cfg, + monitoringCfg: monitoring, + components: components, + logLevel: ll, + } + + // we care only about the latest config update +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) select { case m.cfgCh <- cfg: case <-m.doneChan: diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index 13518a38e5c..c2e673bf4e6 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -64,7 +64,802 @@ var ( } ) +<<<<<<< HEAD func TestOTelManager_Run(t *testing.T) { +======= +type testExecution struct { + mtx sync.Mutex + exec collectorExecution + handle collectorHandle +} + +func (e *testExecution) startCollector(ctx context.Context, level string, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) { + e.mtx.Lock() + defer e.mtx.Unlock() + + var err error + e.handle, err = e.exec.startCollector(ctx, level, baseLogger, logger, cfg, errCh, statusCh, forceFetchStatusCh) + return e.handle, err +} + +func (e *testExecution) getProcessHandle() collectorHandle { + e.mtx.Lock() + defer e.mtx.Unlock() + + return e.handle +} + +var _ collectorExecution = &mockExecution{} + +type mockExecution struct { + errCh chan error + statusCh chan *status.AggregateStatus + cfg *confmap.Conf + collectorStarted chan struct{} +} + +func (e *mockExecution) startCollector( + ctx context.Context, + level string, + _ *logger.Logger, + _ *logger.Logger, + cfg *confmap.Conf, + errCh chan error, + statusCh chan *status.AggregateStatus, + _ chan struct{}, +) (collectorHandle, error) { + e.errCh = errCh + e.statusCh = statusCh + e.cfg = cfg + stopCh := make(chan struct{}) + collectorCtx, collectorCancel := context.WithCancel(ctx) + go func() { + <-collectorCtx.Done() + close(stopCh) + reportErr(ctx, errCh, nil) + }() + handle := &mockCollectorHandle{ + stopCh: stopCh, + cancel: collectorCancel, + } + if e.collectorStarted != nil { + e.collectorStarted <- struct{}{} + } + return handle, nil +} + +var _ collectorHandle = &mockCollectorHandle{} + +type mockCollectorHandle struct { + stopCh chan struct{} + cancel context.CancelFunc +} + +func (h *mockCollectorHandle) Stop(waitTime time.Duration) { + h.cancel() + select { + case <-time.After(waitTime): + case <-h.stopCh: + } +} + +// EventListener listens to the events from the OTelManager and stores the latest error and status. +type EventListener struct { + mtx sync.Mutex + err *EventTime[error] + collectorStatus *EventTime[*status.AggregateStatus] + componentStates *EventTime[[]runtime.ComponentComponentState] +} + +// Listen starts listening to the error and status channels. It updates the latest error and status in the +// EventListener. +func (e *EventListener) Listen( + ctx context.Context, + errorCh <-chan error, + collectorStatusCh <-chan *status.AggregateStatus, + componentStateCh <-chan []runtime.ComponentComponentState, +) { + for { + select { + case <-ctx.Done(): + return + case c := <-collectorStatusCh: + e.mtx.Lock() + e.collectorStatus = &EventTime[*status.AggregateStatus]{val: c, time: time.Now()} + e.mtx.Unlock() + case c := <-errorCh: + e.mtx.Lock() + e.err = &EventTime[error]{val: c, time: time.Now()} + e.mtx.Unlock() + case componentStates := <-componentStateCh: + e.mtx.Lock() + e.componentStates = &EventTime[[]runtime.ComponentComponentState]{val: componentStates, time: time.Now()} + e.mtx.Unlock() + } + } +} + +// getError retrieves the latest error from the EventListener. +func (e *EventListener) getError() error { + e.mtx.Lock() + defer e.mtx.Unlock() + return e.err.Value() +} + +// getCollectorStatus retrieves the latest collector status from the EventListener. +func (e *EventListener) getCollectorStatus() *status.AggregateStatus { + e.mtx.Lock() + defer e.mtx.Unlock() + return e.collectorStatus.Value() +} + +// EnsureHealthy ensures that the OTelManager is healthy by checking the latest error and status. +func (e *EventListener) EnsureHealthy(t *testing.T, u time.Time) { + require.EventuallyWithT(t, func(collect *assert.CollectT) { + e.mtx.Lock() + latestErr := e.err + latestStatus := e.collectorStatus + e.mtx.Unlock() + + // we expect to have a reported error which is nil and a reported status which is StatusOK + require.NotNil(collect, latestErr) + assert.Nil(collect, latestErr.Value()) + assert.False(collect, latestErr.Before(u)) + require.NotNil(collect, latestStatus) + require.NotNil(collect, latestStatus.Value()) + assert.False(collect, latestStatus.Before(u)) + require.Equal(collect, componentstatus.StatusOK, latestStatus.Value().Status()) + }, 60*time.Second, 1*time.Second, "otel collector never got healthy") +} + +// EnsureFatal ensures that the OTelManager is fatal by checking the latest error and status. +func (e *EventListener) EnsureFatal(t *testing.T, u time.Time, extraT ...func(collectT *assert.CollectT, latestErr *EventTime[error], latestStatus *EventTime[*status.AggregateStatus])) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + e.mtx.Lock() + latestErr := e.err + latestStatus := e.collectorStatus + e.mtx.Unlock() + + // we expect to have a reported error which is nil and a reported status which is StatusOK + require.NotNil(collect, latestErr) + assert.Nil(collect, latestErr.Value()) + assert.False(collect, latestErr.Before(u)) + require.NotNil(collect, latestStatus) + require.NotNil(collect, latestStatus.Value()) + assert.False(collect, latestStatus.Before(u)) + require.Equal(collect, componentstatus.StatusFatalError, latestStatus.Value().Status()) + + // extra checks + for _, et := range extraT { + et(collect, latestErr, latestStatus) + } + }, 60*time.Second, 1*time.Second, "otel collector never fatal") +} + +// EnsureOffWithoutError ensures that the OTelManager is off without an error by checking the latest error and status. +func (e *EventListener) EnsureOffWithoutError(t *testing.T, u time.Time) { + require.EventuallyWithT(t, func(collect *assert.CollectT) { + e.mtx.Lock() + latestErr := e.err + latestStatus := e.collectorStatus + e.mtx.Unlock() + + // we expect to have a reported error which is nil and a reported status which is nil + require.NotNil(collect, latestErr) + assert.Nil(collect, latestErr.Value()) + assert.False(collect, latestErr.Before(u)) + require.NotNil(collect, latestStatus) + assert.Nil(collect, latestStatus.Value()) + assert.False(collect, latestStatus.Before(u)) + }, 60*time.Second, 1*time.Second, "otel collector never stopped without an error") +} + +// EnsureOffWithError ensures that the OTelManager is off with an error by checking the latest error and status. +func (e *EventListener) EnsureOffWithError(t *testing.T, u time.Time) { + require.EventuallyWithT(t, func(collect *assert.CollectT) { + e.mtx.Lock() + latestErr := e.err + latestStatus := e.collectorStatus + e.mtx.Unlock() + + // we expect to have a reported error which is not nil and a reported status which is nil + require.False(collect, latestErr == nil || latestErr.Before(u) || latestErr.Value() == nil) + require.False(collect, latestStatus == nil || latestStatus.Before(u) || latestStatus.Value() != nil) + }, 60*time.Second, 1*time.Second, "otel collector never errored with an error") +} + +// EventTime is a wrapper around a time.Time and a value of type T. It provides methods to compare the time and retrieve the value. +type EventTime[T interface{}] struct { + time time.Time + val T +} + +// Before checks if the EventTime's time is before the given time u. +func (t *EventTime[T]) Before(u time.Time) bool { + return t != nil && t.time.Before(u) +} + +// Value retrieves the value of type T from the EventTime. If the EventTime is nil, it returns the zero value of T. +func (t *EventTime[T]) Value() T { + if t == nil { + var zero T + return zero + } + return t.val +} + +// Time retrieves the time associated with the EventTime. If the EventTime is nil, it returns the zero value of time.Time. +func (t *EventTime[T]) Time() time.Time { + if t == nil { + return time.Time{} + } + return t.time +} + +func countHealthCheckExtensionStatuses(status *status.AggregateStatus) uint { + if status == nil { + return 0 + } + extensions, ok := status.ComponentStatusMap["extensions"] + if !ok { + return 0 + } + + count := uint(0) + for key := range extensions.ComponentStatusMap { + if strings.HasPrefix(key, "extension:healthcheckv2/") { + count++ + } + } + + return count +} + +func TestOTelManager_Run(t *testing.T) { + wd, erWd := os.Getwd() + require.NoError(t, erWd, "cannot get working directory") + + testBinary := filepath.Join(wd, "..", "..", "..", "..", "internal", "edot", "testing", "testing") + require.FileExists(t, testBinary, "testing binary not found") + + const waitTimeForStop = 30 * time.Second + + for _, tc := range []struct { + name string + execModeFn func(collectorRunErr chan error) (collectorExecution, error) + restarter collectorRecoveryTimer + skipListeningErrors bool + testFn func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) + }{ + { + name: "subprocess collector config updates", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg, nil, logp.InfoLevel, nil) + e.EnsureHealthy(t, updateTime) + + // trigger update + updateTime = time.Now() + ok := cfg.Delete("service::telemetry::logs::level") // modify the config + require.True(t, ok) + m.Update(cfg, nil, logp.InfoLevel, nil) + e.EnsureHealthy(t, updateTime) + + // no configuration should stop the runner + updateTime = time.Now() + m.Update(nil, nil, logp.InfoLevel, nil) + e.EnsureOffWithoutError(t, updateTime) + assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") + assert.Equal(t, uint32(0), m.recoveryRetries.Load(), "recovery retries should be 0") + }, + }, + { + name: "subprocess collector stopped gracefully outside manager", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg, nil, logp.InfoLevel, nil) + e.EnsureHealthy(t, updateTime) + + // stop it, this should be restarted by the manager + updateTime = time.Now() + execHandle := exec.getProcessHandle() + require.NotNil(t, execHandle, "execModeFn handle should not be nil") + execHandle.Stop(waitTimeForStop) + e.EnsureHealthy(t, updateTime) + require.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 0") + + // no configuration should stop the runner + updateTime = time.Now() + m.Update(nil, nil, logp.InfoLevel, nil) + e.EnsureOffWithoutError(t, updateTime) + assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") + assert.Equal(t, uint32(0), m.recoveryRetries.Load(), "recovery retries should be 0") + }, + }, + { + name: "subprocess collector killed outside manager", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg, nil, logp.InfoLevel, nil) + e.EnsureHealthy(t, updateTime) + require.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 0") + + var oldPHandle *procHandle + // repeatedly kill the collector + for i := 0; i < 3; i++ { + // kill it + handle := exec.getProcessHandle() + require.NotNil(t, handle, "execModeFn handle should not be nil, iteration ", i) + pHandle, ok := handle.(*procHandle) + require.True(t, ok, "execModeFn handle should be of type procHandle, iteration ", i) + if oldPHandle != nil { + require.NotEqual(t, pHandle.processInfo.PID, oldPHandle.processInfo.PID, "processes PIDs should be different, iteration ", i) + } + oldPHandle = pHandle + require.NoError(t, pHandle.processInfo.Kill(), "failed to kill collector process, iteration ", i) + // the collector should restart and report healthy + updateTime = time.Now() + e.EnsureHealthy(t, updateTime) + require.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 0") + } + + seenRecoveredTimes := m.recoveryRetries.Load() + + // no configuration should stop the runner + updateTime = time.Now() + m.Update(nil, nil, logp.InfoLevel, nil) + e.EnsureOffWithoutError(t, updateTime) + assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") + assert.Equal(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") + }, + }, + { + name: "subprocess collector panics restarts", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + err := os.Setenv("TEST_SUPERVISED_COLLECTOR_PANIC", (3 * time.Second).String()) + require.NoError(t, err, "failed to set TEST_SUPERVISED_COLLECTOR_PANIC env var") + t.Cleanup(func() { + _ = os.Unsetenv("TEST_SUPERVISED_COLLECTOR_PANIC") + }) + + cfg := confmap.NewFromStringMap(testConfig) + m.Update(cfg, nil, logp.InfoLevel, nil) + + seenRecoveredTimes := uint32(0) + require.Eventually(t, func() bool { + seenRecoveredTimes = m.recoveryRetries.Load() + return seenRecoveredTimes > 2 + }, time.Minute, time.Second, "expected recovered times to be at least 3, got %d", seenRecoveredTimes) + + err = os.Unsetenv("TEST_SUPERVISED_COLLECTOR_PANIC") + require.NoError(t, err, "failed to unset TEST_SUPERVISED_COLLECTOR_PANIC env var") + updateTime := time.Now() + e.EnsureHealthy(t, updateTime) + + // no configuration should stop the runner + updateTime = time.Now() + m.Update(nil, nil, logp.InfoLevel, nil) + e.EnsureOffWithoutError(t, updateTime) + require.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") + assert.GreaterOrEqual(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") + }, + }, + { + name: "subprocess collector panics reports fatal", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + // panic instantly always + err := os.Setenv("TEST_SUPERVISED_COLLECTOR_PANIC", "0s") + require.NoError(t, err, "failed to set TEST_SUPERVISED_COLLECTOR_PANIC env var") + t.Cleanup(func() { + _ = os.Unsetenv("TEST_SUPERVISED_COLLECTOR_PANIC") + }) + + cfg := confmap.NewFromStringMap(testConfig) + m.Update(cfg, nil, logp.InfoLevel, nil) + + // ensure that it reports a generic fatal error for all components, a panic cannot be assigned to + // a specific component in the collector + e.EnsureFatal(t, time.Now().Add(time.Second), func(collectT *assert.CollectT, _ *EventTime[error], latestStatus *EventTime[*status.AggregateStatus]) { + status := latestStatus.Value() + + // healthcheck auto added + extensions, ok := status.ComponentStatusMap["extensions"] + require.True(collectT, ok, "extensions should be present") + assert.Equal(collectT, extensions.Status(), componentstatus.StatusFatalError) + + metrics, ok := status.ComponentStatusMap["pipeline:metrics"] + require.True(collectT, ok, "pipeline metrics should be present") + assert.Equal(collectT, metrics.Status(), componentstatus.StatusFatalError) + + logs, ok := status.ComponentStatusMap["pipeline:logs"] + require.True(collectT, ok, "pipeline logs should be present") + assert.Equal(collectT, logs.Status(), componentstatus.StatusFatalError) + + traces, ok := status.ComponentStatusMap["pipeline:traces"] + require.True(collectT, ok, "pipeline traces should be present") + assert.Equal(collectT, traces.Status(), componentstatus.StatusFatalError) + }) + }, + }, + { + name: "subprocess collector killed if delayed and manager is stopped", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + subprocessExec, err := newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) + if err != nil { + return nil, err + } + subprocessExec.reportErrFn = func(ctx context.Context, errCh chan error, err error) { + // override the reportErrFn to send the error to this test collectorRunErr channel + // so we can listen to subprocess run errors + if errCh != collectorRunErr { + // if the error channel is not the one we expect, forward the error to the original reportErrFn + reportErr(ctx, errCh, err) + return + } + collectorRunErr <- err + } + return &testExecution{exec: subprocessExec}, nil + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + delayDuration := 40 * time.Second // the otel manager stop timeout is waitTimeForStop (30 seconds) + t.Setenv("TEST_SUPERVISED_COLLECTOR_DELAY", delayDuration.String()) + + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg, nil, logp.InfoLevel, nil) + e.EnsureHealthy(t, updateTime) + + // stop the manager to simulate that elastic-agent is shutting down + managerCtxCancel() + + // wait for the manager to report done + select { + case <-m.doneChan: + case <-time.After(10 * time.Second): + require.Fail(t, "manager should have reported done") + case <-t.Context().Done(): + return + } + + // wait for the subprocess to exit by checking the collectorRunErr channel + select { + case err := <-collectorRunErr: + require.Error(t, err, "process should have exited with an error") + case <-t.Context().Done(): + return + case <-time.After(2 * waitTimeForStop): + require.Fail(t, "timeout waiting for process to exit") + } + }, + }, + { + name: "subprocess collector gracefully exited if delayed a bit and manager is stopped", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + subprocessExec, err := newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) + if err != nil { + return nil, err + } + subprocessExec.reportErrFn = func(ctx context.Context, errCh chan error, err error) { + // override the reportErrFn to send the error to this test collectorRunErr channel + // so we can listen to subprocess run errors + if errCh != collectorRunErr { + // if the error channel is not the one we expect, forward the error to the original reportErrFn + reportErr(ctx, errCh, err) + return + } + collectorRunErr <- err + } + return &testExecution{exec: subprocessExec}, nil + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + delayDuration := 5 * time.Second // the otel manager stop timeout is waitTimeForStop (30 seconds) + t.Setenv("TEST_SUPERVISED_COLLECTOR_DELAY", delayDuration.String()) + + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg, nil, logp.InfoLevel, nil) + e.EnsureHealthy(t, updateTime) + + // stop the manager to simulate that elastic-agent is shutting down + managerCtxCancel() + + // wait for the manager to report done + select { + case <-m.doneChan: + case <-time.After(10 * time.Second): + require.Fail(t, "manager should have reported done") + case <-t.Context().Done(): + return + } + + // wait for the subprocess to exit by checking the collectorRunErr channel + select { + case err := <-collectorRunErr: + require.NoError(t, err, "process should have exited without an error") + case <-t.Context().Done(): + return + case <-time.After(2 * waitTimeForStop): + require.Fail(t, "timeout waiting for process to exit") + } + }, + }, + { + name: "subprocess user has healthcheck extension", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + + subprocessExec, ok := exec.exec.(*subprocessExecution) + require.True(t, ok, "execution mode isn't subprocess") + + cfg := confmap.NewFromStringMap(testConfig) + + nsUUID, err := uuid.NewV4() + require.NoError(t, err, "failed to create a uuid") + + componentType, err := otelComponent.NewType(healthCheckExtensionName) + require.NoError(t, err, "failed to create component type") + + healthCheckExtensionID := otelComponent.NewIDWithName(componentType, nsUUID.String()).String() + + ports, err := findRandomTCPPorts(3) + require.NoError(t, err, "failed to find random tcp ports") + subprocessExec.collectorHealthCheckPort = ports[0] + subprocessExec.collectorMetricsPort = ports[1] + err = injectHealthCheckV2Extension(cfg, healthCheckExtensionID, ports[2]) + require.NoError(t, err, "failed to inject user health extension") + + updateTime := time.Now() + m.Update(cfg, nil, logp.InfoLevel, nil) + e.EnsureHealthy(t, updateTime) + + require.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 1") + }, + }, + { + name: "subprocess collector empty config", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + skipListeningErrors: true, + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + // Errors channel is non-blocking, should be able to send an Update that causes an error multiple + // times without it blocking on sending over the errCh. + for range 3 { + // empty config + // + // this is really validating a flow that is not possible with the elastic-agent + // if the OTEL configuration is determined to be empty then it will not be ran + // + // this does give a good test of a truly invalid configuration + cfg := confmap.New() // empty config + m.Update(cfg, nil, logp.InfoLevel, nil) + + // delay between updates to ensure the collector will have to fail + <-time.After(100 * time.Millisecond) + } + + // because of the retry logic and timing we need to ensure + // that this keeps retrying to see the error and only store + // an actual error + // + // a nil error just means that the collector is trying to restart + // which clears the error on the restart loop + timeoutCh := time.After(time.Second * 5) + var err error + outer: + for { + select { + case e := <-m.Errors(): + if e != nil { + err = e + break outer + } + case <-timeoutCh: + break outer + } + } + assert.Error(t, err, "otel manager should have returned an error") + }, + }, + { + name: "subprocess collector failed to start", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + // not valid receivers/exporters + // + // this needs to be reported as status errors + cfg := confmap.NewFromStringMap(map[string]interface{}{ + "receivers": map[string]interface{}{ + "invalid_receiver": map[string]interface{}{}, + }, + "exporters": map[string]interface{}{ + "invalid_exporter": map[string]interface{}{}, + }, + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "receivers": []string{"invalid_receiver"}, + "exporters": []string{"invalid_exporter"}, + }, + }, + }, + }) + m.Update(cfg, nil, logp.InfoLevel, nil) + e.EnsureFatal(t, time.Now().Add(time.Second), func(collectT *assert.CollectT, _ *EventTime[error], latestStatus *EventTime[*status.AggregateStatus]) { + status := latestStatus.Value() + + // healthcheck auto added + _, ok := status.ComponentStatusMap["extensions"] + require.True(collectT, ok, "extensions should be present") + + traces, ok := status.ComponentStatusMap["pipeline:traces"] + require.True(collectT, ok, "pipeline traces should be present") + assert.Equal(collectT, traces.Status(), componentstatus.StatusFatalError) + + exporter, ok := traces.ComponentStatusMap["exporter:invalid_exporter"] + require.True(collectT, ok, "exporter should be present") + receiver, ok := traces.ComponentStatusMap["receiver:invalid_receiver"] + require.True(collectT, ok, "receiver should be present") + + // both invalid_receiver and invalid_exporter are invalid + assert.Equal(collectT, exporter.Status(), componentstatus.StatusFatalError) + assert.Equal(collectT, receiver.Status(), componentstatus.StatusFatalError) + }) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + l, _ := loggertest.New("otel") + base, obs := loggertest.New("otel") + + m := &OTelManager{ + logger: l, + baseLogger: base, + errCh: make(chan error, 1), // holds at most one error + updateCh: make(chan configUpdate, 1), + collectorStatusCh: make(chan *status.AggregateStatus), + componentStateCh: make(chan []runtime.ComponentComponentState, 1), + doneChan: make(chan struct{}), + collectorRunErr: make(chan error), + recoveryTimer: tc.restarter, + stopTimeout: waitTimeForStop, + agentInfo: &info.AgentInfo{}, + } + + executionMode, err := tc.execModeFn(m.collectorRunErr) + require.NoError(t, err, "failed to create execution mode") + testExecutionMode := &testExecution{exec: executionMode} + m.execution = testExecutionMode + + eListener := &EventListener{} + defer func() { + if !t.Failed() { + return + } + t.Logf("latest received err: %s", eListener.getError()) + t.Logf("latest received status: %s", statusToYaml(eListener.getCollectorStatus())) + for _, entry := range obs.All() { + t.Logf("%+v", entry) + } + }() + + runWg := sync.WaitGroup{} + runWg.Add(1) + go func() { + defer runWg.Done() + if !tc.skipListeningErrors { + eListener.Listen(ctx, m.Errors(), m.WatchCollector(), m.WatchComponents()) + } else { + eListener.Listen(ctx, nil, m.WatchCollector(), m.WatchComponents()) + } + }() + + var runErr error + runWg.Add(1) + managerCtx, managerCancel := context.WithCancel(ctx) + go func() { + defer runWg.Done() + runErr = m.Run(managerCtx) + }() + + tc.testFn(t, m, eListener, testExecutionMode, managerCancel, m.collectorRunErr) + + cancel() + runWg.Wait() + if !errors.Is(runErr, context.Canceled) { + t.Errorf("otel manager returned unexpected error: %v", runErr) + } + }) + } +} + +func TestOTelManager_Logging(t *testing.T) { + wd, erWd := os.Getwd() + require.NoError(t, erWd, "cannot get working directory") + + testBinary := filepath.Join(wd, "..", "..", "..", "..", "internal", "edot", "testing", "testing") + require.FileExists(t, testBinary, "testing binary not found") + +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() base, _ := loggertest.New("otel") @@ -85,6 +880,7 @@ func TestOTelManager_Run(t *testing.T) { err = e errMx.Unlock() } +<<<<<<< HEAD } } }() @@ -93,6 +889,16 @@ func TestOTelManager_Run(t *testing.T) { defer errMx.Unlock() return err } +======= + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + // the execution mode passed here is overridden below so it is irrelevant + m, err := NewOTelManager(l, logp.InfoLevel, base, &info.AgentInfo{}, nil, nil, waitTimeForStop) + require.NoError(t, err, "could not create otel manager") +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) var latestMx sync.Mutex var latest *status.AggregateStatus @@ -138,9 +944,14 @@ func TestOTelManager_Run(t *testing.T) { lastStatus := getLatestStatus() lastErr := getLatestErr() +<<<<<<< HEAD // never got healthy, stop the manager and wait for it to end cancel() runWg.Wait() +======= + cfg := confmap.NewFromStringMap(testConfig) + m.Update(cfg, nil, logp.InfoLevel, nil) +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) // if a run error happened then report that if !errors.Is(runErr, context.Canceled) { @@ -186,12 +997,203 @@ func TestOTelManager_Run(t *testing.T) { } } +<<<<<<< HEAD func TestOTelManager_ConfigError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() base, _ := loggertest.New("otel") l, _ := loggertest.New("otel-manager") m := NewOTelManager(l, base) +======= +func TestOTelManager_Ports(t *testing.T) { + ports, err := findRandomTCPPorts(2) + require.NoError(t, err) + healthCheckPort, metricsPort := ports[0], ports[1] + agentCollectorConfig := configuration.CollectorConfig{ + HealthCheckConfig: configuration.CollectorHealthCheckConfig{ + Endpoint: fmt.Sprintf("http://localhost:%d", healthCheckPort), + }, + TelemetryConfig: configuration.CollectorTelemetryConfig{ + Endpoint: fmt.Sprintf("http://localhost:%d", metricsPort), + }, + } + + wd, erWd := os.Getwd() + require.NoError(t, erWd, "cannot get working directory") + + testBinary := filepath.Join(wd, "..", "..", "..", "..", "internal", "edot", "testing", "testing") + require.FileExists(t, testBinary, "testing binary not found") + + const waitTimeForStop = 30 * time.Second + + for _, tc := range []struct { + name string + execModeFn func(collectorRunErr chan error) (collectorExecution, error) + healthCheckEnabled bool + }{ + { + name: "subprocess execution", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + hcUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + return newSubprocessExecution(testBinary, hcUUID.String(), metricsPort, healthCheckPort) + }, + healthCheckEnabled: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + base, obs := loggertest.New("otel") + l, _ := loggertest.New("otel-manager") + ctx := t.Context() + + t.Cleanup(func() { + if t.Failed() { + for _, log := range obs.All() { + t.Logf("%+v", log) + } + } + }) + + // the execution mode passed here is overridden below so it is irrelevant + m, err := NewOTelManager( + l, + logp.InfoLevel, + base, + &info.AgentInfo{}, + &agentCollectorConfig, + nil, + waitTimeForStop, + ) + require.NoError(t, err, "could not create otel manager") + + executionMode, err := tc.execModeFn(m.collectorRunErr) + require.NoError(t, err, "failed to create execution mode") + testExecutionMode := &testExecution{exec: executionMode} + m.execution = testExecutionMode + + go func() { + err := m.Run(ctx) + assert.ErrorIs(t, err, context.Canceled, "otel manager should be cancelled") + }() + + go func() { + for { + select { + case colErr := <-m.Errors(): + require.NoError(t, colErr, "otel manager should not return errors") + case <-m.WatchComponents(): // ensure we receive component updates + case <-ctx.Done(): + return + } + } + }() + + cfg := confmap.NewFromStringMap(testConfig) + cfg.Delete("service::telemetry::metrics::level") // change this to default + m.Update(cfg, nil, logp.InfoLevel, nil) + + // wait until status reflects the config update + require.EventuallyWithT(t, func(collect *assert.CollectT) { + select { + case collectorStatus := <-m.WatchCollector(): + require.NotNil(collect, collectorStatus, "collector status should not be nil") + assert.Equal(collect, componentstatus.StatusOK, collectorStatus.Status()) + assert.NotEmpty(collect, collectorStatus.ComponentStatusMap) + case <-ctx.Done(): + require.NoError(collect, ctx.Err()) + } + }, time.Second*10, time.Second) + + // the collector should expose its status and metrics on the set ports + healthCheckUrl := fmt.Sprintf("http://localhost:%d%s", healthCheckPort, healthCheckHealthStatusPath) + metricsUrl := fmt.Sprintf("http://localhost:%d/metrics", metricsPort) + urlsToCheck := []string{metricsUrl} + if tc.healthCheckEnabled { + urlsToCheck = append(urlsToCheck, healthCheckUrl) + } + for _, url := range urlsToCheck { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + assert.NoError(collect, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(collect, err) + defer func() { + _ = resp.Body.Close() + }() + assert.Equal(collect, http.StatusOK, resp.StatusCode) + }, time.Second*10, time.Second) + } + }) + } +} + +// TestOTelManager_PortConflict test verifies that the collector restarts and tries new ports if it encounters a port +// conflict. +func TestOTelManager_PortConflict(t *testing.T) { + // switch the net.Listen implementation with one that returns test listeners that ignore the Close call the first + // two times + var timesCalled int + var mx sync.Mutex + netListen = func(network string, address string) (net.Listener, error) { + mx.Lock() + defer mx.Unlock() + l, err := net.Listen(network, address) + if err != nil { + return nil, err + } + if timesCalled < 2 { + // only actually close the listener after test completion, freeing the port + t.Cleanup(func() { + assert.NoError(t, l.Close()) + }) + // this listener won't free the port even after Close is called, leading to port binding conflicts later + // in the test + l = &fakeCloseListener{inner: l} + } + timesCalled++ + return l, err + } + t.Cleanup(func() { + netListen = net.Listen + }) + + wd, erWd := os.Getwd() + require.NoError(t, erWd, "cannot get working directory") + + testBinary := filepath.Join(wd, "..", "..", "..", "..", "internal", "edot", "testing", "testing") + require.FileExists(t, testBinary, "testing binary not found") + + const waitTimeForStop = 30 * time.Second + + base, obs := loggertest.New("base") + l := base.Named("otel-manager") + ctx := t.Context() + + t.Cleanup(func() { + if t.Failed() { + for _, log := range obs.All() { + t.Logf("%+v", log) + } + } + }) + + // the execution mode passed here is overridden below so it is irrelevant + m, err := NewOTelManager( + l, + logp.InfoLevel, + base, + &info.AgentInfo{}, + nil, + nil, + waitTimeForStop, + ) + require.NoError(t, err, "could not create otel manager") + executionMode, err := newSubprocessExecution(testBinary, strings.TrimPrefix(m.healthCheckExtID, "extension:healthcheckv2/"), 0, 0) + require.NoError(t, err, "could not create subprocess execution mode") + m.execution = executionMode +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) go func() { err := m.Run(ctx) @@ -268,12 +1270,30 @@ func TestOTelManager_Logging(t *testing.T) { cfg := confmap.NewFromStringMap(testConfig) m.Update(cfg) +<<<<<<< HEAD // the collector should log to the base logger assert.EventuallyWithT(t, func(collect *assert.CollectT) { logs := obs.All() require.NotEmpty(collect, logs, "Logs should not be empty") firstMessage := logs[0].Message assert.Equal(collect, "Internal metrics telemetry disabled", firstMessage) +======= + // no retries, collector is not running + assert.Equal(t, uint32(0), m.recoveryRetries.Load()) + + m.Update(cfg, nil, logp.InfoLevel, nil) + + // wait until status reflects the config update + require.EventuallyWithT(t, func(collect *assert.CollectT) { + select { + case collectorStatus := <-m.WatchCollector(): + require.NotNil(collect, collectorStatus, "collector status should not be nil") + assert.Equal(collect, componentstatus.StatusOK, collectorStatus.Status()) + assert.NotEmpty(collect, collectorStatus.ComponentStatusMap) + case <-ctx.Done(): + require.NoError(collect, ctx.Err()) + } +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) }, time.Second*10, time.Second) } @@ -309,3 +1329,1040 @@ func toSerializableStatus(s *status.AggregateStatus) *serializableStatus { } return outputStruct } +<<<<<<< HEAD +======= + +// Mock function for BeatMonitoringConfigGetter +func mockBeatMonitoringConfigGetter(unitID, binary string) map[string]any { + return map[string]any{"test": "config"} +} + +// Helper function to create test logger +func newTestLogger() *logger.Logger { + l, _ := loggertest.New("test") + return l +} + +func TestOTelManager_buildMergedConfig(t *testing.T) { + // Common parameters used across all test cases + commonAgentInfo := &info.AgentInfo{} + commonBeatMonitoringConfigGetter := mockBeatMonitoringConfigGetter + testComp := testComponent("test-component") + + tests := []struct { + name string + collectorCfg *confmap.Conf + components []component.Component + expectedKeys []string + expectedErrorString string + expectedLogLevel string + }{ + { + name: "nil config returns nil", + collectorCfg: nil, + components: nil, + expectedLogLevel: "", + }, + { + name: "empty config returns empty config", + collectorCfg: nil, + components: nil, + expectedKeys: []string{}, + expectedLogLevel: "", + }, + { + name: "collector config only", + collectorCfg: confmap.NewFromStringMap(testConfig), + components: nil, + expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + expectedLogLevel: "info", + }, + { + name: "components only", + collectorCfg: nil, + components: []component.Component{testComp}, + expectedKeys: []string{"receivers", "exporters", "service"}, + expectedLogLevel: "DEBUG", + }, + { + name: "both collector config and components", + collectorCfg: confmap.NewFromStringMap(testConfig), + components: []component.Component{testComp}, + expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + expectedLogLevel: "info", + }, + { + name: "component config generation error", + collectorCfg: nil, + components: []component.Component{{ + ID: "test-component", + InputType: "filestream", // Supported input type + OutputType: "elasticsearch", // Supported output type + // Missing InputSpec which should cause an error during config generation + }}, + expectedErrorString: "failed to generate otel config: unknown otel receiver type for input type: filestream", + expectedLogLevel: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfgUpdate := configUpdate{ + collectorCfg: tt.collectorCfg, + components: tt.components, + logLevel: logp.DebugLevel, + } + result, err := buildMergedConfig(cfgUpdate, commonAgentInfo, commonBeatMonitoringConfigGetter, logptest.NewTestingLogger(t, "")) + + if tt.expectedErrorString != "" { + assert.Error(t, err) + assert.Equal(t, tt.expectedErrorString, err.Error()) + assert.Nil(t, result) + return + } + + assert.NoError(t, err) + + if len(tt.expectedKeys) == 0 { + assert.Nil(t, result) + return + } + + // assert log level provided by user is given precedence. + if tt.expectedLogLevel != "" { + assert.Equal(t, tt.expectedLogLevel, result.Get("service::telemetry::logs::level")) + } + + require.NotNil(t, result) + for _, key := range tt.expectedKeys { + assert.True(t, result.IsSet(key), "Expected key %s to be set", key) + } + }) + } +} + +func TestOTelManager_handleOtelStatusUpdate(t *testing.T) { + // Common test component used across test cases + testComp := testComponent("test-component") + + tests := []struct { + name string + components []component.Component + inputStatus *status.AggregateStatus + expectedErrorString string + expectedCollectorStatus *status.AggregateStatus + expectedComponentStates []runtime.ComponentComponentState + }{ + { + name: "successful status update with component states", + components: []component.Component{testComp}, + inputStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + // This represents a pipeline for our component (with OtelNamePrefix) + "pipeline:logs/_agent-component/test-component": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + "receiver:filebeat/_agent-component/test-component": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + "exporter:elasticsearch/_agent-component/test-component": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + // This represents a regular collector pipeline (should remain after cleaning) + "pipeline:logs": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + "extensions": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + "extension:beatsauth/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + "extension:elastic_diagnostics/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + "extension:healthcheckv2/uuid": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + }, + }, + expectedCollectorStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + // This represents a regular collector pipeline (should remain after cleaning) + "pipeline:logs": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + expectedComponentStates: []runtime.ComponentComponentState{ + { + Component: testComp, + State: runtime.ComponentState{ + State: client.UnitStateHealthy, + Message: "Healthy", + Units: map[runtime.ComponentUnitKey]runtime.ComponentUnitState{ + runtime.ComponentUnitKey{ + UnitID: "filestream-unit", + UnitType: client.UnitTypeInput, + }: { + State: client.UnitStateHealthy, + Message: "Healthy", + Payload: map[string]any{ + "streams": map[string]map[string]string{ + "test-1": { + "error": "", + "status": client.UnitStateHealthy.String(), + }, + "test-2": { + "error": "", + "status": client.UnitStateHealthy.String(), + }, + }, + }, + }, + runtime.ComponentUnitKey{ + UnitID: "filestream-default", + UnitType: client.UnitTypeOutput, + }: { + State: client.UnitStateHealthy, + Message: "Healthy", + }, + }, + VersionInfo: runtime.ComponentVersionInfo{ + Name: translate.OtelComponentName, + Meta: map[string]string{ + "build_time": version.BuildTime().String(), + "commit": version.Commit(), + }, + BuildHash: version.Commit(), + }, + }, + }, + }, + }, + { + name: "handles nil otel status", + components: []component.Component{}, + inputStatus: nil, + expectedCollectorStatus: nil, + expectedComponentStates: nil, + }, + { + name: "handles empty components list", + components: []component.Component{}, + inputStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + expectedErrorString: "", + expectedCollectorStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + expectedComponentStates: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := &OTelManager{ + logger: newTestLogger(), + components: tt.components, + healthCheckExtID: "extension:healthcheckv2/uuid", + currentComponentStates: make(map[string]runtime.ComponentComponentState), + } + + componentStates, err := mgr.handleOtelStatusUpdate(tt.inputStatus) + + // Verify error expectation + if tt.expectedErrorString != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.expectedErrorString) + return + } + + require.NoError(t, err) + + // Compare component states + assert.Equal(t, tt.expectedComponentStates, componentStates) + + // Compare collector status + assertOtelStatusesEqualIgnoringTimestamps(t, tt.expectedCollectorStatus, mgr.currentCollectorStatus) + }) + } +} + +func TestOTelManager_processComponentStates(t *testing.T) { + tests := []struct { + name string + currentComponentStates map[string]runtime.ComponentComponentState + inputComponentStates []runtime.ComponentComponentState + expectedOutputStates []runtime.ComponentComponentState + expectedCurrentStatesAfter map[string]runtime.ComponentComponentState + }{ + { + name: "empty input and current states", + currentComponentStates: map[string]runtime.ComponentComponentState{}, + inputComponentStates: []runtime.ComponentComponentState{}, + expectedOutputStates: []runtime.ComponentComponentState{}, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, + }, + { + name: "new component state added", + currentComponentStates: map[string]runtime.ComponentComponentState{}, + inputComponentStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + expectedOutputStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{ + "comp1": { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + }, + { + name: "component removed from config generates STOPPED state", + currentComponentStates: map[string]runtime.ComponentComponentState{ + "comp1": { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + inputComponentStates: []runtime.ComponentComponentState{}, + expectedOutputStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateStopped}, + }, + }, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, + }, + { + name: "component stopped removes from current states", + currentComponentStates: map[string]runtime.ComponentComponentState{ + "comp1": { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + inputComponentStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateStopped}, + }, + }, + expectedOutputStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateStopped}, + }, + }, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := &OTelManager{ + logger: newTestLogger(), + currentComponentStates: tt.currentComponentStates, + } + + result := mgr.processComponentStates(tt.inputComponentStates) + + assert.ElementsMatch(t, tt.expectedOutputStates, result) + assert.Equal(t, tt.expectedCurrentStatesAfter, mgr.currentComponentStates) + }) + } +} + +// TestOTelManagerEndToEnd tests the full lifecycle of the OTelManager including configuration updates, status updates, +// and error handling. This test only uses synthetic errors and statuses, and the mock execution used doesn't behave +// exactly like the real executions. +func TestOTelManagerEndToEnd(t *testing.T) { + // Setup test logger and dependencies + testLogger, _ := loggertest.New("test") + agentInfo := &info.AgentInfo{} + beatMonitoringConfigGetter := mockBeatMonitoringConfigGetter + collectorStarted := make(chan struct{}) + + execution := &mockExecution{ + collectorStarted: collectorStarted, + } + + // Create manager with test dependencies + mgr := OTelManager{ + logger: testLogger, + baseLogger: testLogger, + errCh: make(chan error, 1), // holds at most one error + updateCh: make(chan configUpdate, 1), + collectorStatusCh: make(chan *status.AggregateStatus, 1), + componentStateCh: make(chan []runtime.ComponentComponentState, 5), + doneChan: make(chan struct{}), + recoveryTimer: newRestarterNoop(), + execution: execution, + agentInfo: agentInfo, + beatMonitoringConfigGetter: beatMonitoringConfigGetter, + collectorRunErr: make(chan error), + } + + // Start manager in a goroutine + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + + go func() { + err := mgr.Run(ctx) + assert.ErrorIs(t, err, context.Canceled) + }() + + collectorCfg := confmap.NewFromStringMap(map[string]interface{}{ + "receivers": map[string]interface{}{ + "nop": map[string]interface{}{}, + }, + "exporters": map[string]interface{}{"nop": map[string]interface{}{}}, + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "metrics": map[string]interface{}{ + "receivers": []string{"nop"}, + "exporters": []string{"nop"}, + }, + }, + }, + }) + + testComp := testComponent("test") + components := []component.Component{testComp} + + t.Run("collector config is passed down to the collector execution", func(t *testing.T) { + mgr.Update(collectorCfg, nil, logp.InfoLevel, nil) + select { + case <-collectorStarted: + case <-ctx.Done(): + t.Fatal("timeout waiting for collector config update") + } + expectedCfg := confmap.NewFromStringMap(collectorCfg.ToStringMap()) + assert.NoError(t, injectDiagnosticsExtension(expectedCfg)) + assert.NoError(t, addCollectorMetricsReader(expectedCfg)) + assert.Equal(t, expectedCfg, execution.cfg) + + }) + + t.Run("collector status is passed up to the component manager", func(t *testing.T) { + otelStatus := &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + } + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case execution.statusCh <- otelStatus: + } + + componentStates, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchComponents(), mgr.Errors()) + require.NoError(t, err) + assert.Empty(t, componentStates) + collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) + require.NoError(t, err) + assert.Equal(t, otelStatus, collectorStatus) + }) + + t.Run("component config is passed down to the otel manager", func(t *testing.T) { + mgr.Update(collectorCfg, nil, logp.InfoLevel, components) + select { + case <-collectorStarted: + case <-ctx.Done(): + t.Fatal("timeout waiting for collector config update") + } + cfg := execution.cfg + require.NotNil(t, cfg) + receivers, err := cfg.Sub("receivers") + require.NoError(t, err) + require.NotNil(t, receivers) + assert.True(t, receivers.IsSet("nop")) + assert.True(t, receivers.IsSet("filebeatreceiver/_agent-component/test")) + }) + + t.Run("empty collector config leaves the component config running", func(t *testing.T) { + mgr.Update(nil, nil, logp.InfoLevel, components) + select { + case <-collectorStarted: + case <-ctx.Done(): + t.Fatal("timeout waiting for collector config update") + } + cfg := execution.cfg + require.NotNil(t, cfg) + receivers, err := cfg.Sub("receivers") + require.NoError(t, err) + require.NotNil(t, receivers) + assert.False(t, receivers.IsSet("nop")) + assert.True(t, receivers.IsSet("filebeatreceiver/_agent-component/test")) + }) + + t.Run("collector status with components is passed up to the component manager", func(t *testing.T) { + otelStatus := &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + // This represents a pipeline for our component (with OtelNamePrefix) + "pipeline:logs/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + "receiver:filebeatreceiver/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + "exporter:elasticsearch/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + }, + } + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case execution.statusCh <- otelStatus: + } + + componentState, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchComponents(), mgr.Errors()) + require.NoError(t, err) + require.NotNil(t, componentState) + require.Len(t, componentState, 1) + assert.Equal(t, componentState[0].Component, testComp) + + collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) + require.NoError(t, err) + require.NotNil(t, collectorStatus) + assert.Len(t, collectorStatus.ComponentStatusMap, 0) + }) + + t.Run("collector execution error is passed as status not error", func(t *testing.T) { + collectorErr := errors.New("collector error") + + var err error + var aggStatus *status.AggregateStatus + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case aggStatus = <-mgr.WatchCollector(): + case <-mgr.WatchComponents(): + // don't block (ignored for test) + case e := <-mgr.Errors(): + err = e + if err != nil { + // only return if real error (nil is just clearing the error state) + return + } + case <-time.After(time.Second): + // didn't get an error (good!) + return + } + } + }() + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case execution.errCh <- collectorErr: + } + wg.Wait() + + // should not come in as an error + require.Nil(t, err, "got unexpected error from the collector execution") + + // should have a fatal error in status + require.NotNil(t, aggStatus) + assert.Equal(t, aggStatus.Status(), componentstatus.StatusFatalError) + }) +} + +// TestManagerAlwaysEmitsStoppedStatesForComponents checks that the manager always emits a STOPPED state for a component +// at least once, even if we're slow to retrieve the state. This is part of the contract with the coordinator. +func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { + // Setup test logger and dependencies + testLogger, _ := loggertest.New("test") + beatMonitoringConfigGetter := mockBeatMonitoringConfigGetter + collectorStarted := make(chan struct{}) + + execution := &mockExecution{ + collectorStarted: collectorStarted, + } + + // Create manager with test dependencies + mgr, err := NewOTelManager( + testLogger, + logp.InfoLevel, + testLogger, + &info.AgentInfo{}, + nil, + beatMonitoringConfigGetter, + time.Second, + ) + require.NoError(t, err) + mgr.recoveryTimer = newRestarterNoop() + mgr.execution = execution + + // Start manager in a goroutine + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + + go func() { + err := mgr.Run(ctx) + assert.ErrorIs(t, err, context.Canceled) + }() + + testComp := testComponent("test") + components := []component.Component{testComp} + otelStatus := &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + // This represents a pipeline for our component (with OtelNamePrefix) + "pipeline:logs/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + "receiver:filebeatreceiver/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + "exporter:elasticsearch/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + }, + } + // start the collector by giving it a mock config + mgr.Update(nil, nil, logp.InfoLevel, components) + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case <-execution.collectorStarted: + } + + // send the status from the execution + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case execution.statusCh <- otelStatus: + } + + // verify we get the component running state from the manager + componentStates, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchComponents(), mgr.Errors()) + require.NoError(t, err) + require.NotNil(t, componentStates) + require.Len(t, componentStates, 1) + componentState := componentStates[0] + assert.Equal(t, componentState.State.State, client.UnitStateHealthy) + + // stop the component by sending a nil config + mgr.Update(nil, nil, logp.InfoLevel, nil) + + // then send a nil status, indicating the collector is not running the component anymore + // do this a few times to see if the STOPPED state isn't lost along the way + for range 3 { + reportCollectorStatus(ctx, execution.statusCh, nil) + time.Sleep(time.Millisecond * 100) // TODO: Replace this with synctest after we upgrade to Go 1.25 + } + + // verify that we get a STOPPED state for the component + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + componentStates, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchComponents(), mgr.Errors()) + require.NoError(collect, err) + require.NotNil(collect, componentStates) + require.Len(collect, componentStates, 1) + componentState := componentStates[0] + assert.Equal(collect, componentState.State.State, client.UnitStateStopped) + }, time.Millisecond, time.Second*5) +} + +func TestManagerEmitsStartingStatesWhenHealthcheckIsUnavailable(t *testing.T) { + testLogger, _ := loggertest.New("test") + agentInfo := &info.AgentInfo{} + beatMonitoringConfigGetter := mockBeatMonitoringConfigGetter + collectorStarted := make(chan struct{}) + + execution := &mockExecution{ + collectorStarted: collectorStarted, + } + + // Create manager with test dependencies + mgr, err := NewOTelManager( + testLogger, + logp.InfoLevel, + testLogger, + agentInfo, + nil, + beatMonitoringConfigGetter, + time.Second, + ) + require.NoError(t, err) + mgr.recoveryTimer = newRestarterNoop() + mgr.execution = execution + + // Start manager in a goroutine + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + + go func() { + err := mgr.Run(ctx) + assert.ErrorIs(t, err, context.Canceled) + }() + + testComp := testComponent("test") + components := []component.Component{testComp} + otelStatus := &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusStarting), + } + // start the collector by giving it a mock config + mgr.Update(nil, nil, logp.InfoLevel, components) + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case <-execution.collectorStarted: + } + + // send the status from the execution + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case execution.statusCh <- otelStatus: + } + + // verify we get the component Starting state from the manager + componentStates, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchComponents(), mgr.Errors()) + require.NoError(t, err) + require.NotNil(t, componentStates) + require.Len(t, componentStates, 1) + componentState := componentStates[0] + assert.Equal(t, componentState.State.State, client.UnitStateStarting) + assert.Equal(t, componentState.State.Message, "STARTING") + + // stop the component by sending a nil config + mgr.Update(nil, nil, logp.InfoLevel, nil) + + // then send a nil status, indicating the collector is not running the component anymore + // do this a few times to see if the STOPPED state isn't lost along the way + for range 3 { + reportCollectorStatus(ctx, execution.statusCh, nil) + time.Sleep(time.Millisecond * 100) // TODO: Replace this with synctest after we upgrade to Go 1.25 + } + + // verify that we get a STOPPED state for the component + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + componentStates, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchComponents(), mgr.Errors()) + require.NoError(collect, err) + require.NotNil(collect, componentStates) + require.Len(collect, componentStates, 1) + componentState := componentStates[0] + assert.Equal(collect, componentState.State.State, client.UnitStateStopped) + }, time.Millisecond, time.Second*5) +} + +func getFromChannelOrErrorWithContext[T any](t *testing.T, ctx context.Context, ch <-chan T, errCh <-chan error) (T, error) { + t.Helper() + var result T + var err error + for err == nil { + select { + case result = <-ch: + return result, nil + case err = <-errCh: + case <-ctx.Done(): + err = ctx.Err() + } + } + return result, err +} + +func assertOtelStatusesEqualIgnoringTimestamps(t require.TestingT, a, b *status.AggregateStatus) bool { + if a == nil || b == nil { + return assert.Equal(t, a, b) + } + + if !assert.Equal(t, a.Status(), b.Status()) { + return false + } + + if !assert.Equal(t, len(a.ComponentStatusMap), len(b.ComponentStatusMap)) { + return false + } + + for k, v := range a.ComponentStatusMap { + if !assertOtelStatusesEqualIgnoringTimestamps(t, v, b.ComponentStatusMap[k]) { + return false + } + } + + return true +} + +func TestCalculateConfmapHash(t *testing.T) { + t.Run("nil config returns zero", func(t *testing.T) { + hash, err := calculateConfmapHash(nil) + require.NoError(t, err) + assert.Equal(t, []byte(nil), hash) + }) + + t.Run("same value gives same result", func(t *testing.T) { + conf := confmap.NewFromStringMap(map[string]any{ + "key1": "value1", + "key2": 123, + }) + hash1, err := calculateConfmapHash(conf) + require.NoError(t, err) + hash2, err := calculateConfmapHash(conf) + require.NoError(t, err) + assert.Equal(t, hash1, hash2) + }) + + t.Run("different values give different results", func(t *testing.T) { + conf1 := confmap.NewFromStringMap(map[string]any{ + "key1": "value1", + }) + hash1, err := calculateConfmapHash(conf1) + require.NoError(t, err) + + conf2 := confmap.NewFromStringMap(map[string]any{ + "key1": "value2", + }) + hash2, err := calculateConfmapHash(conf2) + require.NoError(t, err) + + assert.NotEqual(t, hash1, hash2) + }) + + t.Run("list of maps is processed correctly", func(t *testing.T) { + conf1 := confmap.NewFromStringMap(map[string]any{ + "items": []any{ + map[string]any{"name": "A", "value": 1}, + map[string]any{"name": "B", "value": 2}, + }, + }) + hash1, err := calculateConfmapHash(conf1) + require.NoError(t, err) + + t.Run("same list of maps gives same hash", func(t *testing.T) { + conf2 := confmap.NewFromStringMap(map[string]any{ + "items": []any{ + map[string]any{"name": "A", "value": 1}, + map[string]any{"name": "B", "value": 2}, + }, + }) + hash2, err := calculateConfmapHash(conf2) + require.NoError(t, err) + assert.Equal(t, hash1, hash2) + }) + + t.Run("different order in list gives different hash", func(t *testing.T) { + conf3 := confmap.NewFromStringMap(map[string]any{ + "items": []any{ + map[string]any{"name": "B", "value": 2}, + map[string]any{"name": "A", "value": 1}, + }, + }) + hash3, err := calculateConfmapHash(conf3) + require.NoError(t, err) + assert.NotEqual(t, hash1, hash3) + }) + }) +} + +func TestOTelManager_maybeUpdateMergedConfig(t *testing.T) { + t.Run("initial config", func(t *testing.T) { + m := &OTelManager{} + conf := confmap.NewFromStringMap(testConfig) + + updated, err := m.maybeUpdateMergedConfig(conf) + + require.NoError(t, err) + assert.True(t, updated) + assert.Equal(t, conf, m.mergedCollectorCfg) + assert.NotEqual(t, uint64(0), m.mergedCollectorCfgHash) + }) + + t.Run("same config", func(t *testing.T) { + conf := confmap.NewFromStringMap(testConfig) + hash, err := calculateConfmapHash(conf) + require.NoError(t, err) + + m := &OTelManager{ + mergedCollectorCfg: conf, + mergedCollectorCfgHash: hash, + } + + updated, err := m.maybeUpdateMergedConfig(conf) + + require.NoError(t, err) + assert.False(t, updated) + assert.Equal(t, conf, m.mergedCollectorCfg) + assert.Equal(t, hash, m.mergedCollectorCfgHash) + }) + + t.Run("different config", func(t *testing.T) { + conf1 := confmap.NewFromStringMap(map[string]any{"key": "value1"}) + hash1, err := calculateConfmapHash(conf1) + require.NoError(t, err) + + m := &OTelManager{ + mergedCollectorCfg: conf1, + mergedCollectorCfgHash: hash1, + } + + conf2 := confmap.NewFromStringMap(map[string]any{"key": "value2"}) + hash2, err := calculateConfmapHash(conf2) + require.NoError(t, err) + + updated, err := m.maybeUpdateMergedConfig(conf2) + + require.NoError(t, err) + assert.True(t, updated) + assert.Equal(t, conf2, m.mergedCollectorCfg) + assert.Equal(t, hash2, m.mergedCollectorCfgHash) + assert.NotEqual(t, hash1, m.mergedCollectorCfgHash) + }) + + t.Run("hashing error with previous config", func(t *testing.T) { + conf1 := confmap.NewFromStringMap(map[string]any{"key": "value1"}) + hash1, err := calculateConfmapHash(conf1) + require.NoError(t, err) + + m := &OTelManager{ + mergedCollectorCfg: conf1, + mergedCollectorCfgHash: hash1, + } + + badConf := confmap.NewFromStringMap(map[string]any{"bad": make(chan int)}) + updated, err := m.maybeUpdateMergedConfig(badConf) + + require.Error(t, err) + assert.True(t, updated, "update should proceed on hashing error") + assert.Equal(t, badConf, m.mergedCollectorCfg) + assert.Equal(t, []byte(nil), m.mergedCollectorCfgHash) + }) + + t.Run("hashing error with no previous config", func(t *testing.T) { + m := &OTelManager{} + + badConf := confmap.NewFromStringMap(map[string]any{"bad": make(chan int)}) + updated, err := m.maybeUpdateMergedConfig(badConf) + + require.Error(t, err) + assert.True(t, updated, "update should proceed on hashing error, even with no previous config") + assert.Equal(t, badConf, m.mergedCollectorCfg) + assert.Equal(t, []byte(nil), m.mergedCollectorCfgHash) + }) +} + +func TestAddCollectorMetricsPort(t *testing.T) { + expectedReader := map[string]any{ + "pull": map[string]any{ + "exporter": map[string]any{ + "prometheus": map[string]any{ + "host": "localhost", + "port": fmt.Sprintf("${env:%s}", componentmonitoring.OtelCollectorMetricsPortEnvVarName), + "without_scope_info": true, + "without_units": true, + "without_type_suffix": true, + }, + }, + }, + } + otelConfigWithReaders := func(readers any) *confmap.Conf { + baseConf := confmap.NewFromStringMap(testConfig) + err := baseConf.Merge(confmap.NewFromStringMap(map[string]any{ + "service": map[string]any{ + "telemetry": map[string]any{ + "metrics": map[string]any{ + "readers": readers, + }, + }, + }, + })) + require.NoError(t, err) + return baseConf + } + + t.Run("readers does not exist", func(t *testing.T) { + conf := otelConfigWithReaders(nil) + err := addCollectorMetricsReader(conf) + require.NoError(t, err) + + readers := conf.Get("service::telemetry::metrics::readers") + require.NotNil(t, readers) + readersList, ok := readers.([]any) + require.True(t, ok) + require.Len(t, readersList, 1) + + assert.Equal(t, expectedReader, readersList[0]) + }) + + t.Run("readers is an empty list", func(t *testing.T) { + conf := otelConfigWithReaders([]any{}) + err := addCollectorMetricsReader(conf) + require.NoError(t, err) + + readers := conf.Get("service::telemetry::metrics::readers") + require.NotNil(t, readers) + readersList, ok := readers.([]any) + require.True(t, ok) + require.Len(t, readersList, 1) + + assert.Equal(t, expectedReader, readersList[0]) + }) + + t.Run("readers has existing items", func(t *testing.T) { + existingReader := map[string]any{"foo": "bar"} + conf := otelConfigWithReaders([]any{existingReader}) + err := addCollectorMetricsReader(conf) + require.NoError(t, err) + + readers := conf.Get("service::telemetry::metrics::readers") + require.NotNil(t, readers) + readersList, ok := readers.([]any) + require.True(t, ok) + require.Len(t, readersList, 2) + + assert.Equal(t, existingReader, readersList[0]) + assert.Equal(t, expectedReader, readersList[1]) + }) + + t.Run("readers is not a list", func(t *testing.T) { + conf := otelConfigWithReaders("not a list") + err := addCollectorMetricsReader(conf) + require.Error(t, err) + assert.Contains(t, err.Error(), "couldn't convert value of service::telemetry::metrics::readers to a list") + }) +} + +// fakeCloseListener is a wrapper around a net.Listener that ignores the Close() method. This is used in a very particular +// port conflict test to ensure ports are not unbound while the otel collector tries to use them. +type fakeCloseListener struct { + inner net.Listener +} + +func (t *fakeCloseListener) Accept() (net.Conn, error) { + return t.inner.Accept() +} + +func (t *fakeCloseListener) Close() error { + return nil +} + +func (t *fakeCloseListener) Addr() net.Addr { + return t.inner.Addr() +} +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index d8ccff00ada..52823ccd79c 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -100,10 +100,78 @@ func GetOtelConfig( return otelConfig, nil } +<<<<<<< HEAD // IsComponentOtelSupported checks if the given component can be run in an Otel Collector. func IsComponentOtelSupported(comp *component.Component) bool { return slices.Contains(OtelSupportedOutputTypes, comp.OutputType) && slices.Contains(OtelSupportedInputTypes, comp.InputType) +======= +func GetOTelLogLevel(level string) string { + if level != "" { + switch strings.ToLower(level) { + case "debug": + return "DEBUG" + case "info": + return "INFO" + case "warning": + return "WARN" + case "error": + return "ERROR" + default: + return "INFO" + } + } + return "INFO" +} + +// VerifyComponentIsOtelSupported verifies that the given component can be run in an Otel Collector. It returns an error +// indicating what the problem is, if it can't. +func VerifyComponentIsOtelSupported(comp *component.Component) error { + if !slices.Contains(OtelSupportedOutputTypes, comp.OutputType) { + return fmt.Errorf("unsupported output type: %s", comp.OutputType) + } + + // check if given input is supported in OTel runtime + // this includes all metricbeat inputs and some filebeat inputs for now + if !slices.Contains(OtelSupportedInputTypes, comp.InputType) { + return fmt.Errorf("unsupported input type: %s", comp.InputType) + } + + // check if the actual configuration is supported. We need to actually generate the config and look for + // the right kind of error + _, compErr := getCollectorConfigForComponent(comp, &info.AgentInfo{}, func(unitID, binary string) map[string]any { + return nil + }, logp.NewNopLogger()) + if errors.Is(compErr, errors.ErrUnsupported) { + return fmt.Errorf("unsupported configuration for %s: %w", comp.ID, compErr) + } + + return nil +} + +// VerifyOutputIsOtelSupported verifies that the given output can be converted into an Otel Collector exporter. It +// returns an error indicating what the problem is, if it can't. +func VerifyOutputIsOtelSupported(outputType string, outputCfg map[string]any) error { + if !slices.Contains(OtelSupportedOutputTypes, outputType) { + return fmt.Errorf("unsupported output type: %s", outputType) + } + exporterType, err := OutputTypeToExporterType(outputType) + if err != nil { + return err + } + + outputCfgC, err := config.NewConfigFrom(outputCfg) + if err != nil { + return err + } + + _, err = OutputConfigToExporterConfig(logp.NewNopLogger(), exporterType, outputCfgC) + if errors.Is(err, errors.ErrUnsupported) { + return fmt.Errorf("unsupported configuration for %s: %w", outputType, err) + } + + return nil +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) } // getSupportedComponents returns components from the given model that can be run in an Otel Collector. diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 9d7db858c1d..aa73371d108 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -22,6 +22,13 @@ import ( "text/template" "time" +<<<<<<< HEAD +======= + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998)) "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -32,6 +39,7 @@ import ( "github.com/elastic/elastic-agent-libs/testing/estools" "github.com/elastic/elastic-agent-libs/transport/tlscommontest" "github.com/elastic/elastic-agent/pkg/control/v2/client" + "github.com/elastic/elastic-agent/pkg/control/v2/cproto" aTesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" @@ -2152,3 +2160,425 @@ service: cancel() } +<<<<<<< HEAD +======= + +func TestOtelBeatsAuthExtensionInvalidCertificates(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + OS: []define.OS{ + // {Type: define.Windows}, we don't support otel on Windows yet + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + + // Create the otel configuration file + type otelConfigOptions struct { + ESEndpoint string + ESApiKey string + Index string + } + esEndpoint, err := integration.GetESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + index := "logs-integration-" + info.Namespace + + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx) + require.NoError(t, err) + + otelConfigTemplate := ` +extensions: + beatsauth: + continue_on_error: true + ssl: + enabled: true + verification_mode: none + certificate: /nonexistent.pem + key: /nonexistent.key + key_passphrase: null + key_passphrase_path: null + verification_mode: none +receivers: + metricbeatreceiver: + metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu + queue.mem.flush.timeout: 0s +exporters: + elasticsearch/log: + endpoints: + - {{.ESEndpoint}} + api_key: {{.ESApiKey}} + logs_index: {{.Index}} + sending_queue: + wait_for_result: true # Avoid losing data on shutdown + block_on_overflow: true + batch: + flush_timeout: 1s + min_size: 1 + auth: + authenticator: beatsauth + mapping: + mode: bodymap +service: + extensions: [beatsauth] + pipelines: + logs: + receivers: + - metricbeatreceiver + exporters: + - elasticsearch/log +` + var otelConfigBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer, + otelConfigOptions{ + ESEndpoint: esEndpoint, + ESApiKey: esApiKey.Encoded, + Index: index, + })) + + // configure elastic-agent.yml + err = fixture.Configure(ctx, otelConfigBuffer.Bytes()) + + // prepare agent command + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + require.NoError(t, err, "cannot prepare Elastic-Agent command: %w", err) + + output := strings.Builder{} + cmd.Stderr = &output + cmd.Stdout = &output + + // start elastic-agent + err = cmd.Start() + require.NoError(t, err) + + t.Cleanup(func() { + if t.Failed() { + t.Log("Elastic-Agent output:") + t.Log(output.String()) + } + }) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + var statusErr error + status, statusErr := fixture.ExecStatus(ctx) + assert.NoError(collect, statusErr) + require.NotNil(collect, status.Collector) + require.NotNil(collect, status.Collector.ComponentStatusMap) + + pipelines, exists := status.Collector.ComponentStatusMap["pipeline:logs"] + require.True(collect, exists) + + receiver, exists := pipelines.ComponentStatusMap["receiver:metricbeatreceiver"] + require.True(collect, exists) + require.EqualValues(collect, receiver.Status, cproto.State_HEALTHY) + + exporter, exists := pipelines.ComponentStatusMap["exporter:elasticsearch/log"] + require.True(collect, exists) + require.EqualValues(collect, exporter.Status, cproto.State_DEGRADED) + }, 2*time.Minute, 5*time.Second) + + cancel() +} + +func TestOutputStatusReporting(t *testing.T) { + define.Require(t, define.Requirements{ + Sudo: true, + Group: integration.Default, + Local: false, + Stack: nil, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + }) + + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + // Create the otel configuration file + type otelConfigOptions struct { + StatusReportingEnabled bool + } + configTemplate := ` +inputs: + - type: system/metrics + id: http-metrics-test + use_output: default + streams: + - metricsets: + - cpu + period: 1s + data_stream: + dataset: e2e + namespace: "json_namespace" +agent.reload: + period: 1s +outputs: + default: + type: elasticsearch + hosts: [http://localhost:9200] + api_key: placeholder + preset: "balanced" + status_reporting: + enabled: {{.StatusReportingEnabled}} +agent.monitoring: + metrics: false + logs: false + http: + enabled: true + port: 6792 +agent.grpc: + port: 6790 +agent.internal.runtime.metricbeat: + system/metrics: otel +` + + var configBuffer bytes.Buffer + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + otelConfigOptions{ + StatusReportingEnabled: true, + }) + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) + defer cancel() + + installOpts := aTesting.InstallOpts{ + NonInteractive: true, + Privileged: true, + Force: true, + Develop: true, + } + + err = fixture.Prepare(ctx) + require.NoError(t, err) + + err = fixture.Configure(ctx, configBuffer.Bytes()) + + output, err := fixture.InstallWithoutEnroll(ctx, &installOpts) + require.NoErrorf(t, err, "error install withouth enroll: %s\ncombinedoutput:\n%s", err, string(output)) + + require.Eventually(t, func() bool { + status, err := fixture.ExecStatus(ctx) + if err != nil { + t.Logf("waiting for agent degraded: %s", err.Error()) + return false + } + return status.State == int(cproto.State_DEGRADED) + }, 30*time.Second, 1*time.Second) + + // Disable status reporting. + // This should result in HEALTHY state + configBuffer.Reset() + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + otelConfigOptions{ + StatusReportingEnabled: false, + }) + err = fixture.Configure(ctx, configBuffer.Bytes()) + require.NoError(t, err) + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 1*time.Minute, 1*time.Second) + + // Enabled status reporting and keep using localhost. + // This should result in DEGRADED state + configBuffer.Reset() + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + otelConfigOptions{ + StatusReportingEnabled: true, + }) + err = fixture.Configure(ctx, configBuffer.Bytes()) + require.NoError(t, err) + require.Eventually(t, func() bool { + status, err := fixture.ExecStatus(ctx) + if err != nil { + t.Logf("waiting for agent degraded: %s", err.Error()) + return false + } + return status.State == int(cproto.State_DEGRADED) + }, 30*time.Second, 1*time.Second) + + combinedOutput, err := fixture.Uninstall(ctx, &aTesting.UninstallOpts{Force: true}) + require.NoErrorf(t, err, "error uninstalling classic agent monitoring, err: %s, combined output: %s", err, string(combinedOutput)) +} + +// This tests that live reloading the log level works correctly +func TestLogReloading(t *testing.T) { + define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + Stack: &define.Stack{}, + }) + + // Flow of the test + // 1. Start elastic-agent with debug logs + // 2. Change the log level to info without restarting + // 3. Ensure no debug logs are printed + // 4. Set service::telemetry::logs::level: debug + // 5. Ensure service::telemetry::logs::level is given precedence even when agent logs are set to info + + // Create the otel configuration file + type otelConfigOptions struct { + ESEndpoint string + ESApiKey string + Index string + CAFile string + } + + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx) + require.NoError(t, err) + + logConfig := ` +outputs: + default: + type: elasticsearch + hosts: + - %s + preset: balanced + protocol: http +agent.logging.level: %s +agent.grpc.port: 6793 +agent.monitoring.enabled: true +agent.logging.to_stderr: true +agent.reload: + period: 1s +` + + esURL := integration.StartMockES(t, 0, 0, 0, 0) + // start with debug logs + cfg := fmt.Sprintf(logConfig, esURL, "debug") + + require.NoError(t, fixture.Configure(ctx, []byte(cfg))) + + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + if err != nil { + t.Fatalf("cannot prepare Elastic-Agent command: %s", err) + } + + observer, zapLogs := observer.New(zap.DebugLevel) + logger := zap.New(observer) + zapWriter := &ZapWriter{logger: logger, level: zap.InfoLevel} + cmd.Stderr = zapWriter + cmd.Stdout = zapWriter + + require.NoError(t, cmd.Start()) + + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 30*time.Second, 1*time.Second) + + // Make sure the Elastic-Agent process is not running before + // exiting the test + t.Cleanup(func() { + // Ignore the error because we cancelled the context, + // and that always returns an error + _ = cmd.Wait() + if t.Failed() { + t.Log("Elastic-Agent output:") + zapLogs.All() + } + }) + + require.Eventually(t, func() bool { + // we ensure OTel runtime inputs have started with correct level + // and not just agent logs + return (zapLogs.FilterMessageSnippet("otelcol.component.kind").FilterMessageSnippet(`"log.level":"debug"`).Len() > 1) + }, 1*time.Minute, 10*time.Second, "could not find debug logs") + + // reset logs + zapLogs.TakeAll() + + // set agent.logging.level: info + cfg = fmt.Sprintf(logConfig, esURL, "info") + require.NoError(t, fixture.Configure(ctx, []byte(cfg))) + + // wait for elastic agent to be healthy and OTel collector to start + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 + }, 1*time.Minute, 10*time.Second, "elastic-agent was not healthy after log level changed to info") + + // if debug level was enabled, we would fine this message + require.Zero(t, zapLogs.FilterMessageSnippet(`Starting health check extension V2`).Len()) + + // set collector logs to debug + logConfig = logConfig + ` +service: + telemetry: + logs: + level: debug +` + + // add service::telemetry::logs::level:debug + cfg = fmt.Sprintf(logConfig, esURL, "info") + require.NoError(t, fixture.Configure(ctx, []byte(cfg))) + + // reset zap logs + zapLogs.TakeAll() + + // wait for elastic agent to be healthy and OTel collector to re-start + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 + }, 1*time.Minute, 10*time.Second, "elastic-agent is not healthy") + + require.Eventually(t, func() bool { + // we ensure inputs have reloaded with correct level + // and not just agent logs + return (zapLogs.FilterMessageSnippet("otelcol.component.kind").FilterMessageSnippet(`"log.level":"debug"`).Len() > 1) + }, 1*time.Minute, 10*time.Second, "collector setting for log level was not given precedence") +} + +type ZapWriter struct { + logger *zap.Logger + level zapcore.Level +} + +func (w *ZapWriter) Write(p []byte) (n int, err error) { + msg := strings.TrimSpace(string(p)) + if msg != "" { + w.logger.Check(w.level, msg).Write() + } + return len(p), nil +} +>>>>>>> 85b7e9932 ((bugfix) log level does not change when standalone agent is reloaded or when otel runtime is used (#11998))