diff --git a/changelog/fragments/1766556100-reload-log-level.yaml b/changelog/fragments/1766556100-reload-log-level.yaml new file mode 100644 index 00000000000..9f2e86b3551 --- /dev/null +++ b/changelog/fragments/1766556100-reload-log-level.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: Fix reloading agent.logging.level for standalone Elastic Agent + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +# pr: https://github.com/owner/repo/1234 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +# issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 1775cf39417..804be353055 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -270,7 +270,8 @@ func New( otelManager, err := otelmanager.NewOTelManager( log.Named("otel_manager"), - logLevel, baseLogger, + logLevel, + baseLogger, agentInfo, cfg.Settings.Collector, monitor.ComponentMonitoringConfig, diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 7b577529cbc..aa81442e973 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -162,7 +162,7 @@ type OTelManager interface { Runner // Update updates the current plain configuration for the otel collector and components. - Update(*confmap.Conf, *monitoringCfg.MonitoringConfig, []component.Component) + Update(*confmap.Conf, *monitoringCfg.MonitoringConfig, logp.Level, []component.Component) // WatchCollector returns a channel to watch for collector status updates. WatchCollector() <-chan *status.AggregateStatus @@ -1674,10 +1674,20 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config } c.currentCfg = currentCfg - if c.vars != nil { - return c.refreshComponentModel(ctx) + // check if log level has changed for standalone elastic-agent + // we'd have to update both the periodic and once config watchers and refactor initialization in application.go to do otherwise. + if c.agentInfo.IsStandalone() { + ll := currentCfg.Settings.LoggingConfig.Level + if ll != c.state.LogLevel { + // set log level for the coordinator + c.setLogLevel(ll) + // set global log level + logger.SetLevel(ll) + c.logger.Infof("log level changed to %s", ll.String()) + } } - return nil + + return c.refreshComponentModel(ctx) } // Generate the AST for a new incoming configuration and, if successful, @@ -1802,10 +1812,13 @@ func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) // Called on the main Coordinator goroutine. func (c *Coordinator) processLogLevel(ctx context.Context, ll logp.Level) { - c.setLogLevel(ll) - err := c.refreshComponentModel(ctx) - if err != nil { - c.logger.Errorf("updating log level: %s", err.Error()) + // do not refresh component model if log level did not change + if c.state.LogLevel != ll { + c.setLogLevel(ll) + err := c.refreshComponentModel(ctx) + if err != nil { + c.logger.Errorf("updating log level: %s", err.Error()) + } } } @@ -1872,7 +1885,7 @@ func (c *Coordinator) updateManagersWithConfig(model *component.Model) { } c.logger.With("component_ids", componentIDs).Info("Using OpenTelemetry collector runtime.") } - c.otelMgr.Update(c.otelCfg, c.currentCfg.Settings.MonitoringConfig, otelModel.Components) + c.otelMgr.Update(c.otelCfg, c.currentCfg.Settings.MonitoringConfig, c.state.LogLevel, otelModel.Components) } // splitModelBetweenManager splits the model components between the runtime manager and the otel manager. diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index f7a3fd491fd..72a54569aed 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -1457,7 +1457,7 @@ func (f *fakeOTelManager) Errors() <-chan error { return f.errChan } -func (f *fakeOTelManager) Update(cfg *confmap.Conf, monitoring *monitoringCfg.MonitoringConfig, components []component.Component) { +func (f *fakeOTelManager) Update(cfg *confmap.Conf, monitoring *monitoringCfg.MonitoringConfig, ll logp.Level, components []component.Component) { var collectorResult, componentResult error if f.updateCollectorCallback != nil { collectorResult = f.updateCollectorCallback(cfg) diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 3fcebe52df5..2e0273fbfed 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -469,7 +469,9 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) { }() tmpDir := t.TempDir() - upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, &info.AgentInfo{}, new(upgrade.AgentWatcherHelper), ttl.NewTTLMarkerRegistry(nil, tmpDir)) + agentInfo, err := info.NewAgentInfo(ctx, false) + require.NoError(t, err) + upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, agentInfo, new(upgrade.AgentWatcherHelper), ttl.NewTTLMarkerRegistry(nil, tmpDir)) require.NoError(t, err, "errored when creating a new upgrader") // Channels have buffer length 1, so we don't have to run on multiple @@ -502,6 +504,7 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) { ast: emptyAST(t), componentPIDTicker: time.NewTicker(time.Second * 30), secretMarkerFunc: testSecretMarkerFunc, + agentInfo: agentInfo, } // Send an invalid config update and confirm that Coordinator reports @@ -594,6 +597,9 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) { defer cancel() logger := logp.NewLogger("testing") + agentInfo, err := info.NewAgentInfo(t.Context(), false) + require.NoError(t, err) + // Channels have buffer length 1 so we don't have to run on multiple // goroutines. stateChan := make(chan State, 1) @@ -619,6 +625,7 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) { ast: emptyAST(t), componentPIDTicker: time.NewTicker(time.Second * 30), secretMarkerFunc: testSecretMarkerFunc, + agentInfo: agentInfo, } // This configuration produces a valid AST but its EQL condition is diff --git a/internal/pkg/agent/application/periodic.go b/internal/pkg/agent/application/periodic.go index b03b6496278..8aec71494dc 100644 --- a/internal/pkg/agent/application/periodic.go +++ b/internal/pkg/agent/application/periodic.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" ) +// periodic checks for local configuration changes type periodic struct { log *logger.Logger period time.Duration @@ -147,6 +148,7 @@ func newPeriodic( } } +// localConfigChange implements coordinator.ConfigChange for local file changes. type localConfigChange struct { cfg *config.Config } diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 0f25c040539..71131d711c9 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -353,19 +353,17 @@ func runElasticAgent( errors.M(errors.MetaKeyPath, paths.AgentConfigFile())) } + // Set the initial log level (either default or from config file) + logger.SetLevel(logLvl) + // Ensure that the log level now matches what is configured in the agentInfo. - if agentInfo.LogLevel() != "" { - var lvl logp.Level - err = lvl.Unpack(agentInfo.LogLevel()) - if err != nil { - l.Error(errors.New(err, "failed to parse agent information log level")) - } else { - logLvl = lvl - logger.SetLevel(lvl) - } + var lvl logp.Level + err = lvl.Unpack(agentInfo.LogLevel()) + if err != nil { + l.Error(errors.New(err, "failed to parse agent information log level")) } else { - // Set the initial log level (either default or from config file) - logger.SetLevel(logLvl) + logLvl = lvl + logger.SetLevel(lvl) } // initiate agent watcher diff --git a/internal/pkg/otel/manager/execution.go b/internal/pkg/otel/manager/execution.go index 7bf91f05512..f3d425b1685 100644 --- a/internal/pkg/otel/manager/execution.go +++ b/internal/pkg/otel/manager/execution.go @@ -25,7 +25,7 @@ type collectorExecution interface { // - errCh: Process exit errors are sent to the errCh channel // - statusCh: Collector's status updates are sent to statusCh channel. // - forceFetchStatusCh: Channel that is used to trigger a forced status update. - startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) + startCollector(ctx context.Context, logLevel string, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) } type collectorHandle interface { diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index 3e64a670882..09a15e4841f 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -40,7 +40,7 @@ const ( // newSubprocessExecution creates a new execution which runs the otel collector in a subprocess. A metricsPort or // healthCheckPort of 0 will result in a random port being used. -func newSubprocessExecution(logLevel logp.Level, collectorPath string, uuid string, metricsPort int, healthCheckPort int) (*subprocessExecution, error) { +func newSubprocessExecution(collectorPath string, uuid string, metricsPort int, healthCheckPort int) (*subprocessExecution, error) { componentType, err := component.NewType(healthCheckExtensionName) if err != nil { return nil, fmt.Errorf("cannot create component type: %w", err) @@ -51,10 +51,8 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string, uuid stri collectorPath: collectorPath, collectorArgs: []string{ fmt.Sprintf("--%s", OtelSetSupervisedFlagName), - fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, logLevel.String()), fmt.Sprintf("--%s=%s", OtelSupervisedMonitoringURLFlagName, monitoring.EDOTMonitoringEndpoint()), }, - logLevel: logLevel, healthCheckExtensionID: healthCheckExtensionID, collectorMetricsPort: metricsPort, collectorHealthCheckPort: healthCheckPort, @@ -62,10 +60,10 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string, uuid stri }, nil } +// subprocessExecution implements collectorExecution by running the collector in a subprocess. type subprocessExecution struct { collectorPath string collectorArgs []string - logLevel logp.Level healthCheckExtensionID string collectorMetricsPort int collectorHealthCheckPort int @@ -74,7 +72,22 @@ type subprocessExecution struct { // startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the // processErrCh channel. Other run errors, such as not able to connect to the health endpoint, are sent to the runErrCh channel. -func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, processErrCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) { +func (r *subprocessExecution) startCollector( + ctx context.Context, + logLevel string, + baseLogger *logger.Logger, + logger *logger.Logger, + cfg *confmap.Conf, + processErrCh chan error, + statusCh chan *status.AggregateStatus, + forceFetchStatusCh chan struct{}, +) (collectorHandle, error) { + var lvl logp.Level + err := lvl.Unpack(logLevel) + if err != nil { + return nil, fmt.Errorf("failed to unpack the log level '%s': %w", logLevel, err) + } + if cfg == nil { // configuration is required return nil, errors.New("no configuration provided") @@ -106,17 +119,21 @@ func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *lo } stdOutLast := newZapLast(baseLogger.Core()) - stdOut := runtimeLogger.NewLogWriterWithDefaults(stdOutLast, zapcore.Level(r.logLevel)) + stdOut := runtimeLogger.NewLogWriterWithDefaults(stdOutLast, zapcore.Level(lvl)) // info level for stdErr because by default collector writes to stderr stdErrLast := newZapLast(baseLogger.Core()) - stdErr := runtimeLogger.NewLogWriterWithDefaults(stdErrLast, zapcore.Level(r.logLevel)) + stdErr := runtimeLogger.NewLogWriterWithDefaults(stdErrLast, zapcore.Level(lvl)) procCtx, procCtxCancel := context.WithCancel(ctx) env := os.Environ() // Set the environment variable for the collector metrics port. See comment at the constant definition for more information. env = append(env, fmt.Sprintf("%s=%d", componentmonitoring.OtelCollectorMetricsPortEnvVarName, collectorMetricsPort)) + + // set collector args + collectorArgs := append(r.collectorArgs, fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, lvl)) + processInfo, err := process.Start(r.collectorPath, - process.WithArgs(r.collectorArgs), + process.WithArgs(collectorArgs), process.WithEnv(env), process.WithCmdOptions(func(c *exec.Cmd) error { c.Stdin = bytes.NewReader(confBytes) diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 328b4a55fbb..0117c9cc2c4 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -64,6 +64,7 @@ type configUpdate struct { collectorCfg *confmap.Conf monitoringCfg *monitoringCfg.MonitoringConfig components []component.Component + logLevel logp.Level } // OTelManager is a manager that manages the lifecycle of the OTel collector inside of the Elastic Agent. @@ -125,6 +126,9 @@ type OTelManager struct { // stopTimeout is the timeout to wait for the collector to stop. stopTimeout time.Duration + + // log level of the collector + logLevel string } // NewOTelManager returns a OTelManager. @@ -166,7 +170,7 @@ func NewOTelManager( executable := filepath.Join(paths.Components(), collectorBinaryName) recoveryTimer = newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute) - exec, err = newSubprocessExecution(logLevel, executable, hcUUIDStr, collectorMetricsPort, collectorHealthCheckPort) + exec, err = newSubprocessExecution(executable, hcUUIDStr, collectorMetricsPort, collectorHealthCheckPort) if err != nil { return nil, fmt.Errorf("failed to create subprocess execution: %w", err) } @@ -188,6 +192,7 @@ func NewOTelManager( recoveryTimer: recoveryTimer, collectorRunErr: make(chan error), stopTimeout: stopTimeout, + logLevel: logLevel.String(), }, nil } @@ -229,7 +234,7 @@ func (m *OTelManager) Run(ctx context.Context) error { newRetries := m.recoveryRetries.Add(1) m.logger.Infof("collector recovery restarting, total retries: %d", newRetries) - m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) + m.proc, err = m.execution.startCollector(ctx, m.logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // report a startup error (this gets reported as status) m.reportStartupErr(ctx, err) @@ -259,7 +264,7 @@ func (m *OTelManager) Run(ctx context.Context) error { // in this rare case the collector stopped running but a configuration was // provided and the collector stopped with a clean exit - m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) + m.proc, err = m.execution.startCollector(ctx, m.logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // report a startup error (this gets reported as status) m.reportStartupErr(ctx, err) @@ -306,6 +311,17 @@ func (m *OTelManager) Run(ctx context.Context) error { configChanged, configUpdateErr := m.maybeUpdateMergedConfig(mergedCfg) m.collectorCfg = cfgUpdate.collectorCfg m.components = cfgUpdate.components + // set the log level defined in service::telemetry::log::level setting + if mergedCfg != nil && mergedCfg.IsSet("service::telemetry::logs::level") { + if logLevel, ok := mergedCfg.Get("service::telemetry::logs::level").(string); ok { + m.logLevel = logLevel + } else { + m.logger.Warn("failed to access log level from service::telemetry::logs::level") + } + } else { + // when mergedCfg is nil use coordinator's log level + m.logLevel = cfgUpdate.logLevel.String() + } m.mx.Unlock() if configUpdateErr != nil { @@ -374,6 +390,12 @@ func buildMergedConfig( if err != nil { return nil, fmt.Errorf("failed to generate otel config: %w", err) } + + level := translate.GetOTelLogLevel(cfgUpdate.logLevel.String()) + if err := componentOtelCfg.Merge(confmap.NewFromStringMap(map[string]any{"service::telemetry::logs::level": level})); err != nil { + return nil, fmt.Errorf("failed to set log level in otel config: %w", err) + } + } // If both configs are nil, return nil so the manager knows to stop the collector @@ -516,7 +538,11 @@ func injectMonitoringReceiver( return config.Merge(confmap.NewFromStringMap(receiverCfg)) } -func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error, forceFetchStatusCh chan struct{}) error { +func (m *OTelManager) applyMergedConfig(ctx context.Context, + collectorStatusCh chan *status.AggregateStatus, + collectorRunErr chan error, + forceFetchStatusCh chan struct{}, +) error { if m.proc != nil { m.proc.Stop(m.stopTimeout) m.proc = nil @@ -544,7 +570,7 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh c } else { // either a new configuration or the first configuration // that results in the collector being started - proc, err := m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh) + proc, err := m.execution.startCollector(ctx, m.logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // failed to create the collector (this is different then // it's failing to run). we do not retry creation on failure @@ -564,11 +590,12 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh c } // Update sends collector configuration and component updates to the manager's run loop. -func (m *OTelManager) Update(cfg *confmap.Conf, monitoring *monitoringCfg.MonitoringConfig, components []component.Component) { +func (m *OTelManager) Update(cfg *confmap.Conf, monitoring *monitoringCfg.MonitoringConfig, ll logp.Level, components []component.Component) { cfgUpdate := configUpdate{ collectorCfg: cfg, monitoringCfg: monitoring, components: components, + logLevel: ll, } // we care only about the latest config update diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index 705a67b468a..e4a5ef35a04 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -91,12 +91,12 @@ type testExecution struct { handle collectorHandle } -func (e *testExecution) startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) { +func (e *testExecution) startCollector(ctx context.Context, level string, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) { e.mtx.Lock() defer e.mtx.Unlock() var err error - e.handle, err = e.exec.startCollector(ctx, baseLogger, logger, cfg, errCh, statusCh, forceFetchStatusCh) + e.handle, err = e.exec.startCollector(ctx, level, baseLogger, logger, cfg, errCh, statusCh, forceFetchStatusCh) return e.handle, err } @@ -118,6 +118,7 @@ type mockExecution struct { func (e *mockExecution) startCollector( ctx context.Context, + level string, _ *logger.Logger, _ *logger.Logger, cfg *confmap.Conf, @@ -355,26 +356,26 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) // trigger update updateTime = time.Now() ok := cfg.Delete("service::telemetry::logs::level") // modify the config require.True(t, ok) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) // no configuration should stop the runner updateTime = time.Now() - m.Update(nil, nil, nil) + m.Update(nil, nil, logp.InfoLevel, nil) e.EnsureOffWithoutError(t, updateTime) assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.Equal(t, uint32(0), m.recoveryRetries.Load(), "recovery retries should be 0") @@ -387,14 +388,14 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) // stop it, this should be restarted by the manager @@ -407,7 +408,7 @@ func TestOTelManager_Run(t *testing.T) { // no configuration should stop the runner updateTime = time.Now() - m.Update(nil, nil, nil) + m.Update(nil, nil, logp.InfoLevel, nil) e.EnsureOffWithoutError(t, updateTime) assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.Equal(t, uint32(0), m.recoveryRetries.Load(), "recovery retries should be 0") @@ -420,14 +421,14 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) require.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 0") @@ -454,7 +455,7 @@ func TestOTelManager_Run(t *testing.T) { // no configuration should stop the runner updateTime = time.Now() - m.Update(nil, nil, nil) + m.Update(nil, nil, logp.InfoLevel, nil) e.EnsureOffWithoutError(t, updateTime) assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.Equal(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") @@ -467,7 +468,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { @@ -478,7 +479,7 @@ func TestOTelManager_Run(t *testing.T) { }) cfg := confmap.NewFromStringMap(testConfig) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) seenRecoveredTimes := uint32(0) require.Eventually(t, func() bool { @@ -493,7 +494,7 @@ func TestOTelManager_Run(t *testing.T) { // no configuration should stop the runner updateTime = time.Now() - m.Update(nil, nil, nil) + m.Update(nil, nil, logp.InfoLevel, nil) e.EnsureOffWithoutError(t, updateTime) require.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.GreaterOrEqual(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") @@ -506,7 +507,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { @@ -518,7 +519,7 @@ func TestOTelManager_Run(t *testing.T) { }) cfg := confmap.NewFromStringMap(testConfig) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) // ensure that it reports a generic fatal error for all components, a panic cannot be assigned to // a specific component in the collector @@ -551,7 +552,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + subprocessExec, err := newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) if err != nil { return nil, err } @@ -575,7 +576,7 @@ func TestOTelManager_Run(t *testing.T) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) // stop the manager to simulate that elastic-agent is shutting down @@ -608,7 +609,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + subprocessExec, err := newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) if err != nil { return nil, err } @@ -632,7 +633,7 @@ func TestOTelManager_Run(t *testing.T) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) // stop the manager to simulate that elastic-agent is shutting down @@ -665,7 +666,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { @@ -691,7 +692,7 @@ func TestOTelManager_Run(t *testing.T) { require.NoError(t, err, "failed to inject user health extension") updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) require.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 1") @@ -704,7 +705,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), skipListeningErrors: true, @@ -719,7 +720,7 @@ func TestOTelManager_Run(t *testing.T) { // // this does give a good test of a truly invalid configuration cfg := confmap.New() // empty config - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) // delay between updates to ensure the collector will have to fail <-time.After(100 * time.Millisecond) @@ -755,7 +756,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { @@ -778,7 +779,7 @@ func TestOTelManager_Run(t *testing.T) { }, }, }) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureFatal(t, time.Now().Add(time.Second), func(collectT *assert.CollectT, _ *EventTime[error], latestStatus *EventTime[*status.AggregateStatus]) { status := latestStatus.Value() @@ -819,6 +820,7 @@ func TestOTelManager_Run(t *testing.T) { collectorRunErr: make(chan error), recoveryTimer: tc.restarter, stopTimeout: waitTimeForStop, + agentInfo: &info.AgentInfo{}, } executionMode, err := tc.execModeFn(m.collectorRunErr) @@ -893,13 +895,13 @@ func TestOTelManager_Logging(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, }, } { t.Run(tc.name, func(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant - m, err := NewOTelManager(l, logp.DebugLevel, base, nil, nil, nil, waitTimeForStop) + m, err := NewOTelManager(l, logp.InfoLevel, base, &info.AgentInfo{}, nil, nil, waitTimeForStop) require.NoError(t, err, "could not create otel manager") executionMode, err := tc.execModeFn(m.collectorRunErr) @@ -924,7 +926,7 @@ func TestOTelManager_Logging(t *testing.T) { }() cfg := confmap.NewFromStringMap(testConfig) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) // the collector should log to the base logger assert.EventuallyWithT(t, func(collect *assert.CollectT) { @@ -970,7 +972,7 @@ func TestOTelManager_Ports(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), metricsPort, healthCheckPort) + return newSubprocessExecution(testBinary, hcUUID.String(), metricsPort, healthCheckPort) }, healthCheckEnabled: true, }, @@ -991,9 +993,9 @@ func TestOTelManager_Ports(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant m, err := NewOTelManager( l, - logp.DebugLevel, + logp.InfoLevel, base, - nil, + &info.AgentInfo{}, &agentCollectorConfig, nil, waitTimeForStop, @@ -1024,7 +1026,7 @@ func TestOTelManager_Ports(t *testing.T) { cfg := confmap.NewFromStringMap(testConfig) cfg.Delete("service::telemetry::metrics::level") // change this to default - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) // wait until status reflects the config update require.EventuallyWithT(t, func(collect *assert.CollectT) { @@ -1114,15 +1116,15 @@ func TestOTelManager_PortConflict(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant m, err := NewOTelManager( l, - logp.DebugLevel, + logp.InfoLevel, base, - nil, + &info.AgentInfo{}, nil, nil, waitTimeForStop, ) require.NoError(t, err, "could not create otel manager") - executionMode, err := newSubprocessExecution(logp.DebugLevel, testBinary, strings.TrimPrefix(m.healthCheckExtID, "extension:healthcheckv2/"), 0, 0) + executionMode, err := newSubprocessExecution(testBinary, strings.TrimPrefix(m.healthCheckExtID, "extension:healthcheckv2/"), 0, 0) require.NoError(t, err, "could not create subprocess execution mode") m.execution = executionMode @@ -1148,7 +1150,7 @@ func TestOTelManager_PortConflict(t *testing.T) { // no retries, collector is not running assert.Equal(t, uint32(0), m.recoveryRetries.Load()) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) // wait until status reflects the config update require.EventuallyWithT(t, func(collect *assert.CollectT) { @@ -1223,35 +1225,41 @@ func TestOTelManager_buildMergedConfig(t *testing.T) { components []component.Component expectedKeys []string expectedErrorString string + expectedLogLevel string }{ { - name: "nil config returns nil", - collectorCfg: nil, - components: nil, + name: "nil config returns nil", + collectorCfg: nil, + components: nil, + expectedLogLevel: "", }, { - name: "empty config returns empty config", - collectorCfg: nil, - components: nil, - expectedKeys: []string{}, + name: "empty config returns empty config", + collectorCfg: nil, + components: nil, + expectedKeys: []string{}, + expectedLogLevel: "", }, { - name: "collector config only", - collectorCfg: confmap.NewFromStringMap(testConfig), - components: nil, - expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + name: "collector config only", + collectorCfg: confmap.NewFromStringMap(testConfig), + components: nil, + expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + expectedLogLevel: "info", }, { - name: "components only", - collectorCfg: nil, - components: []component.Component{testComp}, - expectedKeys: []string{"receivers", "exporters", "service"}, + name: "components only", + collectorCfg: nil, + components: []component.Component{testComp}, + expectedKeys: []string{"receivers", "exporters", "service"}, + expectedLogLevel: "DEBUG", }, { - name: "both collector config and components", - collectorCfg: confmap.NewFromStringMap(testConfig), - components: []component.Component{testComp}, - expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + name: "both collector config and components", + collectorCfg: confmap.NewFromStringMap(testConfig), + components: []component.Component{testComp}, + expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + expectedLogLevel: "info", }, { name: "component config generation error", @@ -1263,6 +1271,7 @@ func TestOTelManager_buildMergedConfig(t *testing.T) { // Missing InputSpec which should cause an error during config generation }}, expectedErrorString: "failed to generate otel config: unknown otel receiver type for input type: filestream", + expectedLogLevel: "", }, } @@ -1271,6 +1280,7 @@ func TestOTelManager_buildMergedConfig(t *testing.T) { cfgUpdate := configUpdate{ collectorCfg: tt.collectorCfg, components: tt.components, + logLevel: logp.DebugLevel, } result, err := buildMergedConfig(cfgUpdate, commonAgentInfo, commonBeatMonitoringConfigGetter, logptest.NewTestingLogger(t, "")) @@ -1288,6 +1298,11 @@ func TestOTelManager_buildMergedConfig(t *testing.T) { return } + // assert log level provided by user is given precedence. + if tt.expectedLogLevel != "" { + assert.Equal(t, tt.expectedLogLevel, result.Get("service::telemetry::logs::level")) + } + require.NotNil(t, result) for _, key := range tt.expectedKeys { assert.True(t, result.IsSet(key), "Expected key %s to be set", key) @@ -1602,7 +1617,7 @@ func TestOTelManagerEndToEnd(t *testing.T) { components := []component.Component{testComp} t.Run("collector config is passed down to the collector execution", func(t *testing.T) { - mgr.Update(collectorCfg, nil, nil) + mgr.Update(collectorCfg, nil, logp.InfoLevel, nil) select { case <-collectorStarted: case <-ctx.Done(): @@ -1635,7 +1650,7 @@ func TestOTelManagerEndToEnd(t *testing.T) { }) t.Run("component config is passed down to the otel manager", func(t *testing.T) { - mgr.Update(collectorCfg, nil, components) + mgr.Update(collectorCfg, nil, logp.InfoLevel, components) select { case <-collectorStarted: case <-ctx.Done(): @@ -1651,7 +1666,7 @@ func TestOTelManagerEndToEnd(t *testing.T) { }) t.Run("empty collector config leaves the component config running", func(t *testing.T) { - mgr.Update(nil, nil, components) + mgr.Update(nil, nil, logp.InfoLevel, components) select { case <-collectorStarted: case <-ctx.Done(): @@ -1751,7 +1766,6 @@ func TestOTelManagerEndToEnd(t *testing.T) { func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { // Setup test logger and dependencies testLogger, _ := loggertest.New("test") - agentInfo := &info.AgentInfo{} beatMonitoringConfigGetter := mockBeatMonitoringConfigGetter collectorStarted := make(chan struct{}) @@ -1762,9 +1776,9 @@ func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { // Create manager with test dependencies mgr, err := NewOTelManager( testLogger, - logp.DebugLevel, + logp.InfoLevel, testLogger, - agentInfo, + &info.AgentInfo{}, nil, beatMonitoringConfigGetter, time.Second, @@ -1802,7 +1816,7 @@ func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { }, } // start the collector by giving it a mock config - mgr.Update(nil, nil, components) + mgr.Update(nil, nil, logp.InfoLevel, components) select { case <-ctx.Done(): t.Fatal("timeout waiting for collector status update") @@ -1825,7 +1839,7 @@ func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { assert.Equal(t, componentState.State.State, client.UnitStateHealthy) // stop the component by sending a nil config - mgr.Update(nil, nil, nil) + mgr.Update(nil, nil, logp.InfoLevel, nil) // then send a nil status, indicating the collector is not running the component anymore // do this a few times to see if the STOPPED state isn't lost along the way @@ -1858,7 +1872,7 @@ func TestManagerEmitsStartingStatesWhenHealthcheckIsUnavailable(t *testing.T) { // Create manager with test dependencies mgr, err := NewOTelManager( testLogger, - logp.DebugLevel, + logp.InfoLevel, testLogger, agentInfo, nil, @@ -1884,7 +1898,7 @@ func TestManagerEmitsStartingStatesWhenHealthcheckIsUnavailable(t *testing.T) { Event: componentstatus.NewEvent(componentstatus.StatusStarting), } // start the collector by giving it a mock config - mgr.Update(nil, nil, components) + mgr.Update(nil, nil, logp.InfoLevel, components) select { case <-ctx.Done(): t.Fatal("timeout waiting for collector status update") @@ -1908,7 +1922,7 @@ func TestManagerEmitsStartingStatesWhenHealthcheckIsUnavailable(t *testing.T) { assert.Equal(t, componentState.State.Message, "STARTING") // stop the component by sending a nil config - mgr.Update(nil, nil, nil) + mgr.Update(nil, nil, logp.InfoLevel, nil) // then send a nil status, indicating the collector is not running the component anymore // do this a few times to see if the STOPPED state isn't lost along the way diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index a4d2a2820cb..895b74bd1c2 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -145,6 +145,24 @@ func GetOtelConfig( return otelConfig, nil } +func GetOTelLogLevel(level string) string { + if level != "" { + switch strings.ToLower(level) { + case "debug": + return "DEBUG" + case "info": + return "INFO" + case "warning": + return "WARN" + case "error": + return "ERROR" + default: + return "INFO" + } + } + return "INFO" +} + // VerifyComponentIsOtelSupported verifies that the given component can be run in an Otel Collector. It returns an error // indicating what the problem is, if it can't. func VerifyComponentIsOtelSupported(comp *component.Component) error { diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index ad79dcd1cda..e57d3757571 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -22,7 +22,9 @@ import ( "text/template" "time" - "github.com/elastic/elastic-agent/pkg/control/v2/cproto" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" @@ -32,6 +34,7 @@ import ( "github.com/elastic/elastic-agent-libs/testing/estools" "github.com/elastic/elastic-agent-libs/transport/tlscommontest" "github.com/elastic/elastic-agent/pkg/control/v2/client" + "github.com/elastic/elastic-agent/pkg/control/v2/cproto" aTesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" @@ -2038,3 +2041,161 @@ agent.internal.runtime.metricbeat: combinedOutput, err := fixture.Uninstall(ctx, &aTesting.UninstallOpts{Force: true}) require.NoErrorf(t, err, "error uninstalling classic agent monitoring, err: %s, combined output: %s", err, string(combinedOutput)) } + +// This tests that live reloading the log level works correctly +func TestLogReloading(t *testing.T) { + define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + Stack: &define.Stack{}, + }) + + // Flow of the test + // 1. Start elastic-agent with debug logs + // 2. Change the log level to info without restarting + // 3. Ensure no debug logs are printed + // 4. Set service::telemetry::logs::level: debug + // 5. Ensure service::telemetry::logs::level is given precedence even when agent logs are set to info + + // Create the otel configuration file + type otelConfigOptions struct { + ESEndpoint string + ESApiKey string + Index string + CAFile string + } + + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx) + require.NoError(t, err) + + logConfig := ` +outputs: + default: + type: elasticsearch + hosts: + - %s + preset: balanced + protocol: http +agent.logging.level: %s +agent.grpc.port: 6793 +agent.monitoring.enabled: true +agent.logging.to_stderr: true +agent.reload: + period: 1s +` + + esURL := integration.StartMockES(t, 0, 0, 0, 0) + // start with debug logs + cfg := fmt.Sprintf(logConfig, esURL, "debug") + + require.NoError(t, fixture.Configure(ctx, []byte(cfg))) + + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + if err != nil { + t.Fatalf("cannot prepare Elastic-Agent command: %s", err) + } + + observer, zapLogs := observer.New(zap.DebugLevel) + logger := zap.New(observer) + zapWriter := &ZapWriter{logger: logger, level: zap.InfoLevel} + cmd.Stderr = zapWriter + cmd.Stdout = zapWriter + + require.NoError(t, cmd.Start()) + + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 30*time.Second, 1*time.Second) + + // Make sure the Elastic-Agent process is not running before + // exiting the test + t.Cleanup(func() { + // Ignore the error because we cancelled the context, + // and that always returns an error + _ = cmd.Wait() + if t.Failed() { + t.Log("Elastic-Agent output:") + zapLogs.All() + } + }) + + require.Eventually(t, func() bool { + // we ensure OTel runtime inputs have started with correct level + // and not just agent logs + return (zapLogs.FilterMessageSnippet("otelcol.component.kind").FilterMessageSnippet(`"log.level":"debug"`).Len() > 1) + }, 1*time.Minute, 10*time.Second, "could not find debug logs") + + // reset logs + zapLogs.TakeAll() + + // set agent.logging.level: info + cfg = fmt.Sprintf(logConfig, esURL, "info") + require.NoError(t, fixture.Configure(ctx, []byte(cfg))) + + // wait for elastic agent to be healthy and OTel collector to start + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 + }, 1*time.Minute, 10*time.Second, "elastic-agent was not healthy after log level changed to info") + + // if debug level was enabled, we would fine this message + require.Zero(t, zapLogs.FilterMessageSnippet(`Starting health check extension V2`).Len()) + + // set collector logs to debug + logConfig = logConfig + ` +service: + telemetry: + logs: + level: debug +` + + // add service::telemetry::logs::level:debug + cfg = fmt.Sprintf(logConfig, esURL, "info") + require.NoError(t, fixture.Configure(ctx, []byte(cfg))) + + // reset zap logs + zapLogs.TakeAll() + + // wait for elastic agent to be healthy and OTel collector to re-start + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 + }, 1*time.Minute, 10*time.Second, "elastic-agent is not healthy") + + require.Eventually(t, func() bool { + // we ensure inputs have reloaded with correct level + // and not just agent logs + return (zapLogs.FilterMessageSnippet("otelcol.component.kind").FilterMessageSnippet(`"log.level":"debug"`).Len() > 1) + }, 1*time.Minute, 10*time.Second, "collector setting for log level was not given precedence") +} + +type ZapWriter struct { + logger *zap.Logger + level zapcore.Level +} + +func (w *ZapWriter) Write(p []byte) (n int, err error) { + msg := strings.TrimSpace(string(p)) + if msg != "" { + w.logger.Check(w.level, msg).Write() + } + return len(p), nil +}