From 579cb92a0f289945fb3b35c0954c7b42e87022df Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 23 Dec 2025 17:24:17 +0530 Subject: [PATCH 01/25] Fix log level does not change when standalone agent is reloaded --- .../application/coordinator/coordinator.go | 24 ++++++++++--- internal/pkg/otel/manager/execution.go | 2 +- .../pkg/otel/manager/execution_subprocess.go | 34 +++++++++++++++++-- internal/pkg/otel/manager/manager.go | 13 +++++-- internal/pkg/otel/manager/manager_test.go | 5 +-- internal/pkg/otel/translate/otelconfig.go | 18 ++++++++++ 6 files changed, 84 insertions(+), 12 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 0e8be696959..ee18a70e058 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1673,6 +1673,19 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config } c.currentCfg = currentCfg + // check if log level has changed in standalone agent config + ll := currentCfg.Settings.LoggingConfig.Level + if ll != c.state.LogLevel { + // set log level for the coordinator + c.setLogLevel(ll) + // set global log level + logger.SetLevel(logp.Level(ll)) + // set agent log level. + // this is used by other parts of the agent to report the log level eg. otel manager + c.agentInfo.SetLogLevel(ctx, ll.String()) + c.logger.Infof("log level changed to %s", ll.String()) + } + if c.vars != nil { return c.refreshComponentModel(ctx) } @@ -1801,10 +1814,13 @@ func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) // Called on the main Coordinator goroutine. func (c *Coordinator) processLogLevel(ctx context.Context, ll logp.Level) { - c.setLogLevel(ll) - err := c.refreshComponentModel(ctx) - if err != nil { - c.logger.Errorf("updating log level: %s", err.Error()) + // do not refresh component model if log level did not change + if c.state.LogLevel != ll { + c.setLogLevel(ll) + err := c.refreshComponentModel(ctx) + if err != nil { + c.logger.Errorf("updating log level: %s", err.Error()) + } } } diff --git a/internal/pkg/otel/manager/execution.go b/internal/pkg/otel/manager/execution.go index 7bf91f05512..f3d425b1685 100644 --- a/internal/pkg/otel/manager/execution.go +++ b/internal/pkg/otel/manager/execution.go @@ -25,7 +25,7 @@ type collectorExecution interface { // - errCh: Process exit errors are sent to the errCh channel // - statusCh: Collector's status updates are sent to statusCh channel. // - forceFetchStatusCh: Channel that is used to trigger a forced status update. - startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) + startCollector(ctx context.Context, logLevel string, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) } type collectorHandle interface { diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index 3e64a670882..1dadc6e975a 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -62,6 +62,7 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string, uuid stri }, nil } +// subprocessExecution implements collectorExecution by running the collector in a subprocess. type subprocessExecution struct { collectorPath string collectorArgs []string @@ -74,7 +75,20 @@ type subprocessExecution struct { // startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the // processErrCh channel. Other run errors, such as not able to connect to the health endpoint, are sent to the runErrCh channel. -func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, processErrCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) { +func (r *subprocessExecution) startCollector( + ctx context.Context, + logLevel string, + baseLogger *logger.Logger, + logger *logger.Logger, + cfg *confmap.Conf, + processErrCh chan error, + statusCh chan *status.AggregateStatus, + forceFetchStatusCh chan struct{}, +) (collectorHandle, error) { + if err := r.setLogLevelFromString(logLevel); err != nil { + return nil, fmt.Errorf("failed to set log level for otel collector: %w", err) + } + if cfg == nil { // configuration is required return nil, errors.New("no configuration provided") @@ -115,8 +129,12 @@ func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *lo env := os.Environ() // Set the environment variable for the collector metrics port. See comment at the constant definition for more information. env = append(env, fmt.Sprintf("%s=%d", componentmonitoring.OtelCollectorMetricsPortEnvVarName, collectorMetricsPort)) + + // set collector args + collectorArgs := append(r.collectorArgs, fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, r.logLevel.String())) + processInfo, err := process.Start(r.collectorPath, - process.WithArgs(r.collectorArgs), + process.WithArgs(collectorArgs), process.WithEnv(env), process.WithCmdOptions(func(c *exec.Cmd) error { c.Stdin = bytes.NewReader(confBytes) @@ -241,6 +259,18 @@ func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *lo return ctl, nil } +func (r *subprocessExecution) setLogLevelFromString(logLevel string) error { + var lvl logp.Level + err := lvl.Unpack(logLevel) + if err != nil { + return fmt.Errorf("invalid log level '%s': %w", logLevel, err) + } + if r.logLevel != lvl { + r.logLevel = lvl + } + return nil +} + // cloneCollectorStatus creates a deep copy of the provided AggregateStatus. func cloneCollectorStatus(aStatus *status.AggregateStatus) *status.AggregateStatus { if aStatus == nil { diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 33d50c51af1..b738b5b9edf 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -223,7 +223,7 @@ func (m *OTelManager) Run(ctx context.Context) error { newRetries := m.recoveryRetries.Add(1) m.logger.Infof("collector recovery restarting, total retries: %d", newRetries) - m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) + m.proc, err = m.execution.startCollector(ctx, m.agentInfo.LogLevel(), m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // report a startup error (this gets reported as status) m.reportStartupErr(ctx, err) @@ -253,7 +253,7 @@ func (m *OTelManager) Run(ctx context.Context) error { // in this rare case the collector stopped running but a configuration was // provided and the collector stopped with a clean exit - m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) + m.proc, err = m.execution.startCollector(ctx, m.agentInfo.LogLevel(), m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // report a startup error (this gets reported as status) m.reportStartupErr(ctx, err) @@ -368,6 +368,13 @@ func buildMergedConfig( if err != nil { return nil, fmt.Errorf("failed to generate otel config: %w", err) } + + // get log level from agent info + level := translate.GetOTelLogLevel(agentInfo.LogLevel()) + if err := componentOtelCfg.Merge(confmap.NewFromStringMap(map[string]any{"service::telemetry::logs::level": level})); err != nil { + return nil, fmt.Errorf("failed to set log level in otel config: %w", err) + } + } // If both configs are nil, return nil so the manager knows to stop the collector @@ -451,7 +458,7 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh c } else { // either a new configuration or the first configuration // that results in the collector being started - proc, err := m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh) + proc, err := m.execution.startCollector(ctx, m.agentInfo.LogLevel(), m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // failed to create the collector (this is different then // it's failing to run). we do not retry creation on failure diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index 3ca253bfbb2..f434a1ea54d 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -91,12 +91,12 @@ type testExecution struct { handle collectorHandle } -func (e *testExecution) startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) { +func (e *testExecution) startCollector(ctx context.Context, level string, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) { e.mtx.Lock() defer e.mtx.Unlock() var err error - e.handle, err = e.exec.startCollector(ctx, baseLogger, logger, cfg, errCh, statusCh, forceFetchStatusCh) + e.handle, err = e.exec.startCollector(ctx, "info", baseLogger, logger, cfg, errCh, statusCh, forceFetchStatusCh) return e.handle, err } @@ -118,6 +118,7 @@ type mockExecution struct { func (e *mockExecution) startCollector( ctx context.Context, + level string, _ *logger.Logger, _ *logger.Logger, cfg *confmap.Conf, diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index 7094ae4b953..ee2a774bb15 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -145,6 +145,24 @@ func GetOtelConfig( return otelConfig, nil } +func GetOTelLogLevel(level string) string { + if level != "" { + switch strings.ToLower(level) { + case "debug": + return "DEBUG" + case "info": + return "INFO" + case "warning": + return "WARN" + case "error": + return "ERROR" + default: + return "INFO" + } + } + return "INFO" +} + // VerifyComponentIsOtelSupported verifies that the given component can be run in an Otel Collector. It returns an error // indicating what the problem is, if it can't. func VerifyComponentIsOtelSupported(comp *component.Component) error { From c02b59d6444796452d90af2ce91a40257dfd822b Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 23 Dec 2025 17:34:10 +0530 Subject: [PATCH 02/25] add comment --- internal/pkg/agent/application/coordinator/coordinator.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index ee18a70e058..49a44b677fb 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1679,10 +1679,13 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config // set log level for the coordinator c.setLogLevel(ll) // set global log level - logger.SetLevel(logp.Level(ll)) + logger.SetLevel(ll) // set agent log level. // this is used by other parts of the agent to report the log level eg. otel manager - c.agentInfo.SetLogLevel(ctx, ll.String()) + err = c.agentInfo.SetLogLevel(ctx, ll.String()) + if err != nil { + c.logger.Errorf("failed to set agent log level: %v", err) + } c.logger.Infof("log level changed to %s", ll.String()) } From b9b5448ae4192d527b840d07313b8bc3c2dce6b5 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 23 Dec 2025 17:35:17 +0530 Subject: [PATCH 03/25] remove log flag --- internal/pkg/otel/manager/execution_subprocess.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index 1dadc6e975a..c4ca01dde45 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -51,7 +51,6 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string, uuid stri collectorPath: collectorPath, collectorArgs: []string{ fmt.Sprintf("--%s", OtelSetSupervisedFlagName), - fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, logLevel.String()), fmt.Sprintf("--%s=%s", OtelSupervisedMonitoringURLFlagName, monitoring.EDOTMonitoringEndpoint()), }, logLevel: logLevel, From d3fc592477291ae2fbafd378e46ffe57497e6ab3 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 24 Dec 2025 11:32:50 +0530 Subject: [PATCH 04/25] add changelog --- .../1766556100-reload-log-level.yaml | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 changelog/fragments/1766556100-reload-log-level.yaml diff --git a/changelog/fragments/1766556100-reload-log-level.yaml b/changelog/fragments/1766556100-reload-log-level.yaml new file mode 100644 index 00000000000..9f2e86b3551 --- /dev/null +++ b/changelog/fragments/1766556100-reload-log-level.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: Fix reloading agent.logging.level for standalone Elastic Agent + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +# pr: https://github.com/owner/repo/1234 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +# issue: https://github.com/owner/repo/1234 From 984062a40499730199243aa43177eaa56d0e924b Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 24 Dec 2025 11:50:33 +0530 Subject: [PATCH 05/25] add comments --- internal/pkg/agent/application/periodic.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/pkg/agent/application/periodic.go b/internal/pkg/agent/application/periodic.go index b03b6496278..8aec71494dc 100644 --- a/internal/pkg/agent/application/periodic.go +++ b/internal/pkg/agent/application/periodic.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" ) +// periodic checks for local configuration changes type periodic struct { log *logger.Logger period time.Duration @@ -147,6 +148,7 @@ func newPeriodic( } } +// localConfigChange implements coordinator.ConfigChange for local file changes. type localConfigChange struct { cfg *config.Config } From 36e34a1eff92462a045d47520144ae6c2ff41674 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 30 Dec 2025 11:37:17 +0530 Subject: [PATCH 06/25] address first three review comments --- internal/pkg/agent/application/application.go | 2 +- .../application/coordinator/coordinator.go | 2 +- internal/pkg/agent/cmd/run.go | 17 +++----- .../pkg/otel/manager/execution_subprocess.go | 3 +- internal/pkg/otel/manager/manager.go | 3 +- internal/pkg/otel/manager/manager_test.go | 39 +++++++++---------- 6 files changed, 29 insertions(+), 37 deletions(-) diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 1775cf39417..54ce5f27994 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -270,7 +270,7 @@ func New( otelManager, err := otelmanager.NewOTelManager( log.Named("otel_manager"), - logLevel, baseLogger, + baseLogger, agentInfo, cfg.Settings.Collector, monitor.ComponentMonitoringConfig, diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 6735050a305..7e676fd8da1 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1674,7 +1674,7 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config } c.currentCfg = currentCfg - // check if log level has changed in standalone agent config + // check if log level has changed in received agent config ll := currentCfg.Settings.LoggingConfig.Level if ll != c.state.LogLevel { // set log level for the coordinator diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 0f25c040539..7b5209f6bd7 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -354,18 +354,13 @@ func runElasticAgent( } // Ensure that the log level now matches what is configured in the agentInfo. - if agentInfo.LogLevel() != "" { - var lvl logp.Level - err = lvl.Unpack(agentInfo.LogLevel()) - if err != nil { - l.Error(errors.New(err, "failed to parse agent information log level")) - } else { - logLvl = lvl - logger.SetLevel(lvl) - } + var lvl logp.Level + err = lvl.Unpack(agentInfo.LogLevel()) + if err != nil { + l.Error(errors.New(err, "failed to parse agent information log level")) } else { - // Set the initial log level (either default or from config file) - logger.SetLevel(logLvl) + logLvl = lvl + logger.SetLevel(lvl) } // initiate agent watcher diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index c4ca01dde45..58b882fb028 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -40,7 +40,7 @@ const ( // newSubprocessExecution creates a new execution which runs the otel collector in a subprocess. A metricsPort or // healthCheckPort of 0 will result in a random port being used. -func newSubprocessExecution(logLevel logp.Level, collectorPath string, uuid string, metricsPort int, healthCheckPort int) (*subprocessExecution, error) { +func newSubprocessExecution(collectorPath string, uuid string, metricsPort int, healthCheckPort int) (*subprocessExecution, error) { componentType, err := component.NewType(healthCheckExtensionName) if err != nil { return nil, fmt.Errorf("cannot create component type: %w", err) @@ -53,7 +53,6 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string, uuid stri fmt.Sprintf("--%s", OtelSetSupervisedFlagName), fmt.Sprintf("--%s=%s", OtelSupervisedMonitoringURLFlagName, monitoring.EDOTMonitoringEndpoint()), }, - logLevel: logLevel, healthCheckExtensionID: healthCheckExtensionID, collectorMetricsPort: metricsPort, collectorHealthCheckPort: healthCheckPort, diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 62c9fb11603..c5d8e45a430 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -130,7 +130,6 @@ type OTelManager struct { // NewOTelManager returns a OTelManager. func NewOTelManager( logger *logger.Logger, - logLevel logp.Level, baseLogger *logger.Logger, agentInfo info.Agent, agentCollectorConfig *configuration.CollectorConfig, @@ -166,7 +165,7 @@ func NewOTelManager( executable := filepath.Join(paths.Components(), collectorBinaryName) recoveryTimer = newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute) - exec, err = newSubprocessExecution(logLevel, executable, hcUUIDStr, collectorMetricsPort, collectorHealthCheckPort) + exec, err = newSubprocessExecution(executable, hcUUIDStr, collectorMetricsPort, collectorHealthCheckPort) if err != nil { return nil, fmt.Errorf("failed to create subprocess execution: %w", err) } diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index ae0b507c968..267f02c8c84 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -29,7 +29,6 @@ import ( "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component" @@ -96,7 +95,7 @@ func (e *testExecution) startCollector(ctx context.Context, level string, baseLo defer e.mtx.Unlock() var err error - e.handle, err = e.exec.startCollector(ctx, "info", baseLogger, logger, cfg, errCh, statusCh, forceFetchStatusCh) + e.handle, err = e.exec.startCollector(ctx, level, baseLogger, logger, cfg, errCh, statusCh, forceFetchStatusCh) return e.handle, err } @@ -356,7 +355,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { @@ -388,7 +387,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { @@ -421,7 +420,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { @@ -468,7 +467,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { @@ -507,7 +506,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { @@ -552,7 +551,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + subprocessExec, err := newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) if err != nil { return nil, err } @@ -609,7 +608,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + subprocessExec, err := newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) if err != nil { return nil, err } @@ -666,7 +665,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { @@ -705,7 +704,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), skipListeningErrors: true, @@ -756,7 +755,7 @@ func TestOTelManager_Run(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { @@ -894,13 +893,13 @@ func TestOTelManager_Logging(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0) + return newSubprocessExecution(testBinary, hcUUID.String(), 0, 0) }, }, } { t.Run(tc.name, func(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant - m, err := NewOTelManager(l, logp.DebugLevel, base, nil, nil, nil, waitTimeForStop) + m, err := NewOTelManager(l, base, nil, nil, nil, waitTimeForStop) require.NoError(t, err, "could not create otel manager") executionMode, err := tc.execModeFn(m.collectorRunErr) @@ -971,7 +970,7 @@ func TestOTelManager_Ports(t *testing.T) { if err != nil { return nil, fmt.Errorf("cannot generate UUID: %w", err) } - return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), metricsPort, healthCheckPort) + return newSubprocessExecution(testBinary, hcUUID.String(), metricsPort, healthCheckPort) }, healthCheckEnabled: true, }, @@ -992,7 +991,7 @@ func TestOTelManager_Ports(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant m, err := NewOTelManager( l, - logp.DebugLevel, + base, nil, &agentCollectorConfig, @@ -1115,7 +1114,7 @@ func TestOTelManager_PortConflict(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant m, err := NewOTelManager( l, - logp.DebugLevel, + base, nil, nil, @@ -1123,7 +1122,7 @@ func TestOTelManager_PortConflict(t *testing.T) { waitTimeForStop, ) require.NoError(t, err, "could not create otel manager") - executionMode, err := newSubprocessExecution(logp.DebugLevel, testBinary, strings.TrimPrefix(m.healthCheckExtID, "extension:healthcheckv2/"), 0, 0) + executionMode, err := newSubprocessExecution(testBinary, strings.TrimPrefix(m.healthCheckExtID, "extension:healthcheckv2/"), 0, 0) require.NoError(t, err, "could not create subprocess execution mode") m.execution = executionMode @@ -1763,7 +1762,7 @@ func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { // Create manager with test dependencies mgr, err := NewOTelManager( testLogger, - logp.DebugLevel, + testLogger, agentInfo, nil, @@ -1859,7 +1858,7 @@ func TestManagerEmitsStartingStatesWhenHealthcheckIsUnavailable(t *testing.T) { // Create manager with test dependencies mgr, err := NewOTelManager( testLogger, - logp.DebugLevel, + testLogger, agentInfo, nil, From 4488ba0f1eed068ab9f41d1bf5f747c1e803c8b8 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 30 Dec 2025 12:37:09 +0530 Subject: [PATCH 07/25] fix test --- internal/pkg/otel/manager/manager_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index 267f02c8c84..b86334886a9 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -819,6 +819,7 @@ func TestOTelManager_Run(t *testing.T) { collectorRunErr: make(chan error), recoveryTimer: tc.restarter, stopTimeout: waitTimeForStop, + agentInfo: &info.AgentInfo{}, } executionMode, err := tc.execModeFn(m.collectorRunErr) From 73979ce176e28c331dc3f28a9d5428910739629d Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 30 Dec 2025 17:09:51 +0530 Subject: [PATCH 08/25] add standalone check --- .../application/coordinator/coordinator.go | 26 ++++++++++--------- internal/pkg/otel/manager/manager_test.go | 13 +++------- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 7e676fd8da1..ca92c464803 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1675,19 +1675,21 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config c.currentCfg = currentCfg // check if log level has changed in received agent config - ll := currentCfg.Settings.LoggingConfig.Level - if ll != c.state.LogLevel { - // set log level for the coordinator - c.setLogLevel(ll) - // set global log level - logger.SetLevel(ll) - // set agent log level. - // this is used by other parts of the agent to report the log level eg. otel manager - err = c.agentInfo.SetLogLevel(ctx, ll.String()) - if err != nil { - c.logger.Errorf("failed to set agent log level: %v", err) + if c.agentInfo.IsStandalone() { + ll := currentCfg.Settings.LoggingConfig.Level + if ll != c.state.LogLevel { + // set log level for the coordinator + c.setLogLevel(ll) + // set global log level + logger.SetLevel(ll) + // set agent log level. + // this is used by other parts of the agent to report the log level eg. otel manager + err = c.agentInfo.SetLogLevel(ctx, ll.String()) + if err != nil { + c.logger.Errorf("failed to set agent log level: %v", err) + } + c.logger.Infof("log level changed to %s", ll.String()) } - c.logger.Infof("log level changed to %s", ll.String()) } if c.vars != nil { diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index b86334886a9..95289a8f1b1 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -900,7 +900,7 @@ func TestOTelManager_Logging(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant - m, err := NewOTelManager(l, base, nil, nil, nil, waitTimeForStop) + m, err := NewOTelManager(l, base, &info.AgentInfo{}, nil, nil, waitTimeForStop) require.NoError(t, err, "could not create otel manager") executionMode, err := tc.execModeFn(m.collectorRunErr) @@ -992,9 +992,8 @@ func TestOTelManager_Ports(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant m, err := NewOTelManager( l, - base, - nil, + &info.AgentInfo{}, &agentCollectorConfig, nil, waitTimeForStop, @@ -1115,9 +1114,8 @@ func TestOTelManager_PortConflict(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant m, err := NewOTelManager( l, - base, - nil, + &info.AgentInfo{}, nil, nil, waitTimeForStop, @@ -1752,7 +1750,6 @@ func TestOTelManagerEndToEnd(t *testing.T) { func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { // Setup test logger and dependencies testLogger, _ := loggertest.New("test") - agentInfo := &info.AgentInfo{} beatMonitoringConfigGetter := mockBeatMonitoringConfigGetter collectorStarted := make(chan struct{}) @@ -1763,9 +1760,8 @@ func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { // Create manager with test dependencies mgr, err := NewOTelManager( testLogger, - testLogger, - agentInfo, + &info.AgentInfo{}, nil, beatMonitoringConfigGetter, time.Second, @@ -1859,7 +1855,6 @@ func TestManagerEmitsStartingStatesWhenHealthcheckIsUnavailable(t *testing.T) { // Create manager with test dependencies mgr, err := NewOTelManager( testLogger, - testLogger, agentInfo, nil, From c3318a72432a2a7070c7acf7259fc87dd9d80768 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 30 Dec 2025 19:25:46 +0530 Subject: [PATCH 09/25] pass local log --- .../application/coordinator/coordinator.go | 12 ++--- .../coordinator/coordinator_test.go | 2 +- .../coordinator/coordinator_unit_test.go | 9 +++- internal/pkg/otel/manager/manager.go | 27 +++++++--- internal/pkg/otel/manager/manager_test.go | 51 ++++++++++--------- 5 files changed, 58 insertions(+), 43 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index ca92c464803..275de9e4e38 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -162,7 +162,7 @@ type OTelManager interface { Runner // Update updates the current plain configuration for the otel collector and components. - Update(*confmap.Conf, *monitoringCfg.MonitoringConfig, []component.Component) + Update(*confmap.Conf, *monitoringCfg.MonitoringConfig, logp.Level, []component.Component) // WatchCollector returns a channel to watch for collector status updates. WatchCollector() <-chan *status.AggregateStatus @@ -1674,7 +1674,7 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config } c.currentCfg = currentCfg - // check if log level has changed in received agent config + // check if log level has changed for standalone elastic-agent if c.agentInfo.IsStandalone() { ll := currentCfg.Settings.LoggingConfig.Level if ll != c.state.LogLevel { @@ -1682,12 +1682,6 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config c.setLogLevel(ll) // set global log level logger.SetLevel(ll) - // set agent log level. - // this is used by other parts of the agent to report the log level eg. otel manager - err = c.agentInfo.SetLogLevel(ctx, ll.String()) - if err != nil { - c.logger.Errorf("failed to set agent log level: %v", err) - } c.logger.Infof("log level changed to %s", ll.String()) } } @@ -1893,7 +1887,7 @@ func (c *Coordinator) updateManagersWithConfig(model *component.Model) { } c.logger.With("component_ids", componentIDs).Info("Using OpenTelemetry collector runtime.") } - c.otelMgr.Update(c.otelCfg, c.currentCfg.Settings.MonitoringConfig, otelModel.Components) + c.otelMgr.Update(c.otelCfg, c.currentCfg.Settings.MonitoringConfig, c.state.LogLevel, otelModel.Components) } // splitModelBetweenManager splits the model components between the runtime manager and the otel manager. diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index f7a3fd491fd..72a54569aed 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -1457,7 +1457,7 @@ func (f *fakeOTelManager) Errors() <-chan error { return f.errChan } -func (f *fakeOTelManager) Update(cfg *confmap.Conf, monitoring *monitoringCfg.MonitoringConfig, components []component.Component) { +func (f *fakeOTelManager) Update(cfg *confmap.Conf, monitoring *monitoringCfg.MonitoringConfig, ll logp.Level, components []component.Component) { var collectorResult, componentResult error if f.updateCollectorCallback != nil { collectorResult = f.updateCollectorCallback(cfg) diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 3fcebe52df5..2e0273fbfed 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -469,7 +469,9 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) { }() tmpDir := t.TempDir() - upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, &info.AgentInfo{}, new(upgrade.AgentWatcherHelper), ttl.NewTTLMarkerRegistry(nil, tmpDir)) + agentInfo, err := info.NewAgentInfo(ctx, false) + require.NoError(t, err) + upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, agentInfo, new(upgrade.AgentWatcherHelper), ttl.NewTTLMarkerRegistry(nil, tmpDir)) require.NoError(t, err, "errored when creating a new upgrader") // Channels have buffer length 1, so we don't have to run on multiple @@ -502,6 +504,7 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) { ast: emptyAST(t), componentPIDTicker: time.NewTicker(time.Second * 30), secretMarkerFunc: testSecretMarkerFunc, + agentInfo: agentInfo, } // Send an invalid config update and confirm that Coordinator reports @@ -594,6 +597,9 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) { defer cancel() logger := logp.NewLogger("testing") + agentInfo, err := info.NewAgentInfo(t.Context(), false) + require.NoError(t, err) + // Channels have buffer length 1 so we don't have to run on multiple // goroutines. stateChan := make(chan State, 1) @@ -619,6 +625,7 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) { ast: emptyAST(t), componentPIDTicker: time.NewTicker(time.Second * 30), secretMarkerFunc: testSecretMarkerFunc, + agentInfo: agentInfo, } // This configuration produces a valid AST but its EQL condition is diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index c5d8e45a430..57c6cd1e0ac 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -64,6 +64,7 @@ type configUpdate struct { collectorCfg *confmap.Conf monitoringCfg *monitoringCfg.MonitoringConfig components []component.Component + logLevel logp.Level } // OTelManager is a manager that manages the lifecycle of the OTel collector inside of the Elastic Agent. @@ -125,6 +126,9 @@ type OTelManager struct { // stopTimeout is the timeout to wait for the collector to stop. stopTimeout time.Duration + + // log level of the collector + logLevel string } // NewOTelManager returns a OTelManager. @@ -187,6 +191,7 @@ func NewOTelManager( recoveryTimer: recoveryTimer, collectorRunErr: make(chan error), stopTimeout: stopTimeout, + logLevel: agentInfo.LogLevel(), }, nil } @@ -228,7 +233,8 @@ func (m *OTelManager) Run(ctx context.Context) error { newRetries := m.recoveryRetries.Add(1) m.logger.Infof("collector recovery restarting, total retries: %d", newRetries) - m.proc, err = m.execution.startCollector(ctx, m.agentInfo.LogLevel(), m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) + // we use log level set on agentInfo at + m.proc, err = m.execution.startCollector(ctx, m.logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // report a startup error (this gets reported as status) m.reportStartupErr(ctx, err) @@ -258,7 +264,7 @@ func (m *OTelManager) Run(ctx context.Context) error { // in this rare case the collector stopped running but a configuration was // provided and the collector stopped with a clean exit - m.proc, err = m.execution.startCollector(ctx, m.agentInfo.LogLevel(), m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) + m.proc, err = m.execution.startCollector(ctx, m.logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // report a startup error (this gets reported as status) m.reportStartupErr(ctx, err) @@ -305,6 +311,7 @@ func (m *OTelManager) Run(ctx context.Context) error { configChanged, configUpdateErr := m.maybeUpdateMergedConfig(mergedCfg) m.collectorCfg = cfgUpdate.collectorCfg m.components = cfgUpdate.components + m.logLevel = cfgUpdate.logLevel.String() m.mx.Unlock() if configUpdateErr != nil { @@ -315,7 +322,7 @@ func (m *OTelManager) Run(ctx context.Context) error { m.logger.Debugf( "new config hash (%d) is different than the old config hash (%d), applying update", m.mergedCollectorCfgHash, previousConfigHash) - applyErr := m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr, forceFetchStatusCh) + applyErr := m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr, forceFetchStatusCh, cfgUpdate.logLevel) // only report the error if we actually apply the update // otherwise, we could override an actual error with a nil in the channel when the collector // state doesn't actually change @@ -375,7 +382,7 @@ func buildMergedConfig( } // get log level from agent info - level := translate.GetOTelLogLevel(agentInfo.LogLevel()) + level := translate.GetOTelLogLevel(cfgUpdate.logLevel.String()) if err := componentOtelCfg.Merge(confmap.NewFromStringMap(map[string]any{"service::telemetry::logs::level": level})); err != nil { return nil, fmt.Errorf("failed to set log level in otel config: %w", err) } @@ -522,7 +529,12 @@ func injectMonitoringReceiver( return config.Merge(confmap.NewFromStringMap(receiverCfg)) } -func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error, forceFetchStatusCh chan struct{}) error { +func (m *OTelManager) applyMergedConfig(ctx context.Context, + collectorStatusCh chan *status.AggregateStatus, + collectorRunErr chan error, + forceFetchStatusCh chan struct{}, + logLevel logp.Level, +) error { if m.proc != nil { m.proc.Stop(m.stopTimeout) m.proc = nil @@ -550,7 +562,7 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh c } else { // either a new configuration or the first configuration // that results in the collector being started - proc, err := m.execution.startCollector(ctx, m.agentInfo.LogLevel(), m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh) + proc, err := m.execution.startCollector(ctx, logLevel.String(), m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // failed to create the collector (this is different then // it's failing to run). we do not retry creation on failure @@ -570,11 +582,12 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh c } // Update sends collector configuration and component updates to the manager's run loop. -func (m *OTelManager) Update(cfg *confmap.Conf, monitoring *monitoringCfg.MonitoringConfig, components []component.Component) { +func (m *OTelManager) Update(cfg *confmap.Conf, monitoring *monitoringCfg.MonitoringConfig, ll logp.Level, components []component.Component) { cfgUpdate := configUpdate{ collectorCfg: cfg, monitoringCfg: monitoring, components: components, + logLevel: ll, } // we care only about the latest config update diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index 95289a8f1b1..713547511fe 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -29,6 +29,7 @@ import ( "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component" @@ -362,19 +363,19 @@ func TestOTelManager_Run(t *testing.T) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) // trigger update updateTime = time.Now() ok := cfg.Delete("service::telemetry::logs::level") // modify the config require.True(t, ok) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) // no configuration should stop the runner updateTime = time.Now() - m.Update(nil, nil, nil) + m.Update(nil, nil, logp.InfoLevel, nil) e.EnsureOffWithoutError(t, updateTime) assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.Equal(t, uint32(0), m.recoveryRetries.Load(), "recovery retries should be 0") @@ -394,7 +395,7 @@ func TestOTelManager_Run(t *testing.T) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) // stop it, this should be restarted by the manager @@ -407,7 +408,7 @@ func TestOTelManager_Run(t *testing.T) { // no configuration should stop the runner updateTime = time.Now() - m.Update(nil, nil, nil) + m.Update(nil, nil, logp.InfoLevel, nil) e.EnsureOffWithoutError(t, updateTime) assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.Equal(t, uint32(0), m.recoveryRetries.Load(), "recovery retries should be 0") @@ -427,7 +428,7 @@ func TestOTelManager_Run(t *testing.T) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) require.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 0") @@ -454,7 +455,7 @@ func TestOTelManager_Run(t *testing.T) { // no configuration should stop the runner updateTime = time.Now() - m.Update(nil, nil, nil) + m.Update(nil, nil, logp.InfoLevel, nil) e.EnsureOffWithoutError(t, updateTime) assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.Equal(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") @@ -478,7 +479,7 @@ func TestOTelManager_Run(t *testing.T) { }) cfg := confmap.NewFromStringMap(testConfig) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) seenRecoveredTimes := uint32(0) require.Eventually(t, func() bool { @@ -493,7 +494,7 @@ func TestOTelManager_Run(t *testing.T) { // no configuration should stop the runner updateTime = time.Now() - m.Update(nil, nil, nil) + m.Update(nil, nil, logp.InfoLevel, nil) e.EnsureOffWithoutError(t, updateTime) require.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.GreaterOrEqual(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") @@ -518,7 +519,7 @@ func TestOTelManager_Run(t *testing.T) { }) cfg := confmap.NewFromStringMap(testConfig) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) // ensure that it reports a generic fatal error for all components, a panic cannot be assigned to // a specific component in the collector @@ -575,7 +576,7 @@ func TestOTelManager_Run(t *testing.T) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) // stop the manager to simulate that elastic-agent is shutting down @@ -632,7 +633,7 @@ func TestOTelManager_Run(t *testing.T) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) // stop the manager to simulate that elastic-agent is shutting down @@ -691,7 +692,7 @@ func TestOTelManager_Run(t *testing.T) { require.NoError(t, err, "failed to inject user health extension") updateTime := time.Now() - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureHealthy(t, updateTime) require.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 1") @@ -719,7 +720,7 @@ func TestOTelManager_Run(t *testing.T) { // // this does give a good test of a truly invalid configuration cfg := confmap.New() // empty config - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) // delay between updates to ensure the collector will have to fail <-time.After(100 * time.Millisecond) @@ -778,7 +779,7 @@ func TestOTelManager_Run(t *testing.T) { }, }, }) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) e.EnsureFatal(t, time.Now().Add(time.Second), func(collectT *assert.CollectT, _ *EventTime[error], latestStatus *EventTime[*status.AggregateStatus]) { status := latestStatus.Value() @@ -925,7 +926,7 @@ func TestOTelManager_Logging(t *testing.T) { }() cfg := confmap.NewFromStringMap(testConfig) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) // the collector should log to the base logger assert.EventuallyWithT(t, func(collect *assert.CollectT) { @@ -1024,7 +1025,7 @@ func TestOTelManager_Ports(t *testing.T) { cfg := confmap.NewFromStringMap(testConfig) cfg.Delete("service::telemetry::metrics::level") // change this to default - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) // wait until status reflects the config update require.EventuallyWithT(t, func(collect *assert.CollectT) { @@ -1147,7 +1148,7 @@ func TestOTelManager_PortConflict(t *testing.T) { // no retries, collector is not running assert.Equal(t, uint32(0), m.recoveryRetries.Load()) - m.Update(cfg, nil, nil) + m.Update(cfg, nil, logp.InfoLevel, nil) // wait until status reflects the config update require.EventuallyWithT(t, func(collect *assert.CollectT) { @@ -1601,7 +1602,7 @@ func TestOTelManagerEndToEnd(t *testing.T) { components := []component.Component{testComp} t.Run("collector config is passed down to the collector execution", func(t *testing.T) { - mgr.Update(collectorCfg, nil, nil) + mgr.Update(collectorCfg, nil, logp.InfoLevel, nil) select { case <-collectorStarted: case <-ctx.Done(): @@ -1634,7 +1635,7 @@ func TestOTelManagerEndToEnd(t *testing.T) { }) t.Run("component config is passed down to the otel manager", func(t *testing.T) { - mgr.Update(collectorCfg, nil, components) + mgr.Update(collectorCfg, nil, logp.InfoLevel, components) select { case <-collectorStarted: case <-ctx.Done(): @@ -1650,7 +1651,7 @@ func TestOTelManagerEndToEnd(t *testing.T) { }) t.Run("empty collector config leaves the component config running", func(t *testing.T) { - mgr.Update(nil, nil, components) + mgr.Update(nil, nil, logp.InfoLevel, components) select { case <-collectorStarted: case <-ctx.Done(): @@ -1799,7 +1800,7 @@ func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { }, } // start the collector by giving it a mock config - mgr.Update(nil, nil, components) + mgr.Update(nil, nil, logp.InfoLevel, components) select { case <-ctx.Done(): t.Fatal("timeout waiting for collector status update") @@ -1822,7 +1823,7 @@ func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { assert.Equal(t, componentState.State.State, client.UnitStateHealthy) // stop the component by sending a nil config - mgr.Update(nil, nil, nil) + mgr.Update(nil, nil, logp.InfoLevel, nil) // then send a nil status, indicating the collector is not running the component anymore // do this a few times to see if the STOPPED state isn't lost along the way @@ -1880,7 +1881,7 @@ func TestManagerEmitsStartingStatesWhenHealthcheckIsUnavailable(t *testing.T) { Event: componentstatus.NewEvent(componentstatus.StatusStarting), } // start the collector by giving it a mock config - mgr.Update(nil, nil, components) + mgr.Update(nil, nil, logp.InfoLevel, components) select { case <-ctx.Done(): t.Fatal("timeout waiting for collector status update") @@ -1904,7 +1905,7 @@ func TestManagerEmitsStartingStatesWhenHealthcheckIsUnavailable(t *testing.T) { assert.Equal(t, componentState.State.Message, "STARTING") // stop the component by sending a nil config - mgr.Update(nil, nil, nil) + mgr.Update(nil, nil, logp.InfoLevel, nil) // then send a nil status, indicating the collector is not running the component anymore // do this a few times to see if the STOPPED state isn't lost along the way From e316032749b116c3c1570d2cfa824fbfcdd09e69 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 30 Dec 2025 19:35:12 +0530 Subject: [PATCH 10/25] add log level to otel manager struct --- internal/pkg/agent/application/application.go | 1 + internal/pkg/otel/manager/manager.go | 3 ++- internal/pkg/otel/manager/manager_test.go | 8 ++++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 54ce5f27994..804be353055 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -270,6 +270,7 @@ func New( otelManager, err := otelmanager.NewOTelManager( log.Named("otel_manager"), + logLevel, baseLogger, agentInfo, cfg.Settings.Collector, diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 57c6cd1e0ac..0e8e7f0f585 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -134,6 +134,7 @@ type OTelManager struct { // NewOTelManager returns a OTelManager. func NewOTelManager( logger *logger.Logger, + logLevel logp.Level, baseLogger *logger.Logger, agentInfo info.Agent, agentCollectorConfig *configuration.CollectorConfig, @@ -191,7 +192,7 @@ func NewOTelManager( recoveryTimer: recoveryTimer, collectorRunErr: make(chan error), stopTimeout: stopTimeout, - logLevel: agentInfo.LogLevel(), + logLevel: logLevel.String(), }, nil } diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index 713547511fe..d988b298b66 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -901,7 +901,7 @@ func TestOTelManager_Logging(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant - m, err := NewOTelManager(l, base, &info.AgentInfo{}, nil, nil, waitTimeForStop) + m, err := NewOTelManager(l, logp.InfoLevel, base, &info.AgentInfo{}, nil, nil, waitTimeForStop) require.NoError(t, err, "could not create otel manager") executionMode, err := tc.execModeFn(m.collectorRunErr) @@ -993,6 +993,7 @@ func TestOTelManager_Ports(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant m, err := NewOTelManager( l, + logp.InfoLevel, base, &info.AgentInfo{}, &agentCollectorConfig, @@ -1115,6 +1116,7 @@ func TestOTelManager_PortConflict(t *testing.T) { // the execution mode passed here is overridden below so it is irrelevant m, err := NewOTelManager( l, + logp.InfoLevel, base, &info.AgentInfo{}, nil, @@ -1761,8 +1763,9 @@ func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { // Create manager with test dependencies mgr, err := NewOTelManager( testLogger, + logp.InfoLevel, testLogger, - &info.AgentInfo{}, + nil, nil, beatMonitoringConfigGetter, time.Second, @@ -1856,6 +1859,7 @@ func TestManagerEmitsStartingStatesWhenHealthcheckIsUnavailable(t *testing.T) { // Create manager with test dependencies mgr, err := NewOTelManager( testLogger, + logp.InfoLevel, testLogger, agentInfo, nil, From bb0ab3dec3fe74b13bf49d171262b20c68ea7685 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 30 Dec 2025 20:53:47 +0530 Subject: [PATCH 11/25] fix ci --- internal/pkg/otel/manager/manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index d988b298b66..a48a7b9f57b 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -1765,7 +1765,7 @@ func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) { testLogger, logp.InfoLevel, testLogger, - nil, + &info.AgentInfo{}, nil, beatMonitoringConfigGetter, time.Second, From e84bceefa4e27a48ea0c6d40a6420c676012aa65 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 31 Dec 2025 16:50:28 +0530 Subject: [PATCH 12/25] address review comments --- .../application/coordinator/coordinator.go | 5 +- .../pkg/otel/manager/execution_subprocess.go | 8 +-- internal/pkg/otel/manager/manager.go | 16 ++++-- internal/pkg/otel/manager/manager_test.go | 51 ++++++++++++------- 4 files changed, 48 insertions(+), 32 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 275de9e4e38..434682e441f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1686,10 +1686,7 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config } } - if c.vars != nil { - return c.refreshComponentModel(ctx) - } - return nil + return c.refreshComponentModel(ctx) } // Generate the AST for a new incoming configuration and, if successful, diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index 58b882fb028..3ed8946b114 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -52,6 +52,9 @@ func newSubprocessExecution(collectorPath string, uuid string, metricsPort int, collectorArgs: []string{ fmt.Sprintf("--%s", OtelSetSupervisedFlagName), fmt.Sprintf("--%s=%s", OtelSupervisedMonitoringURLFlagName, monitoring.EDOTMonitoringEndpoint()), + // edot logger here is being configured to log at debug. + // For more control: use service::telemetry::logs::level + fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, "debug"), }, healthCheckExtensionID: healthCheckExtensionID, collectorMetricsPort: metricsPort, @@ -128,11 +131,8 @@ func (r *subprocessExecution) startCollector( // Set the environment variable for the collector metrics port. See comment at the constant definition for more information. env = append(env, fmt.Sprintf("%s=%d", componentmonitoring.OtelCollectorMetricsPortEnvVarName, collectorMetricsPort)) - // set collector args - collectorArgs := append(r.collectorArgs, fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, r.logLevel.String())) - processInfo, err := process.Start(r.collectorPath, - process.WithArgs(collectorArgs), + process.WithArgs(r.collectorArgs), process.WithEnv(env), process.WithCmdOptions(func(c *exec.Cmd) error { c.Stdin = bytes.NewReader(confBytes) diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 0e8e7f0f585..a168bd30bdf 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -234,7 +234,6 @@ func (m *OTelManager) Run(ctx context.Context) error { newRetries := m.recoveryRetries.Add(1) m.logger.Infof("collector recovery restarting, total retries: %d", newRetries) - // we use log level set on agentInfo at m.proc, err = m.execution.startCollector(ctx, m.logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // report a startup error (this gets reported as status) @@ -312,7 +311,14 @@ func (m *OTelManager) Run(ctx context.Context) error { configChanged, configUpdateErr := m.maybeUpdateMergedConfig(mergedCfg) m.collectorCfg = cfgUpdate.collectorCfg m.components = cfgUpdate.components - m.logLevel = cfgUpdate.logLevel.String() + // set the log level defined in service::telemetry::log::level setting + if mergedCfg.IsSet("service::telemetry::logs::level") { + m.logLevel = mergedCfg.Get("service::telemetry::logs::level").(string) // we know this always be a string. Should not panic + } else { + // this condition is only true is mergedCfg is nil + // In that case, use coordinator's log level + m.logLevel = cfgUpdate.logLevel.String() + } m.mx.Unlock() if configUpdateErr != nil { @@ -323,7 +329,7 @@ func (m *OTelManager) Run(ctx context.Context) error { m.logger.Debugf( "new config hash (%d) is different than the old config hash (%d), applying update", m.mergedCollectorCfgHash, previousConfigHash) - applyErr := m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr, forceFetchStatusCh, cfgUpdate.logLevel) + applyErr := m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr, forceFetchStatusCh, m.logLevel) // only report the error if we actually apply the update // otherwise, we could override an actual error with a nil in the channel when the collector // state doesn't actually change @@ -534,7 +540,7 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error, forceFetchStatusCh chan struct{}, - logLevel logp.Level, + logLevel string, ) error { if m.proc != nil { m.proc.Stop(m.stopTimeout) @@ -563,7 +569,7 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, } else { // either a new configuration or the first configuration // that results in the collector being started - proc, err := m.execution.startCollector(ctx, logLevel.String(), m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh) + proc, err := m.execution.startCollector(ctx, logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // failed to create the collector (this is different then // it's failing to run). we do not retry creation on failure diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index a48a7b9f57b..e4a5ef35a04 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -1225,35 +1225,41 @@ func TestOTelManager_buildMergedConfig(t *testing.T) { components []component.Component expectedKeys []string expectedErrorString string + expectedLogLevel string }{ { - name: "nil config returns nil", - collectorCfg: nil, - components: nil, + name: "nil config returns nil", + collectorCfg: nil, + components: nil, + expectedLogLevel: "", }, { - name: "empty config returns empty config", - collectorCfg: nil, - components: nil, - expectedKeys: []string{}, + name: "empty config returns empty config", + collectorCfg: nil, + components: nil, + expectedKeys: []string{}, + expectedLogLevel: "", }, { - name: "collector config only", - collectorCfg: confmap.NewFromStringMap(testConfig), - components: nil, - expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + name: "collector config only", + collectorCfg: confmap.NewFromStringMap(testConfig), + components: nil, + expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + expectedLogLevel: "info", }, { - name: "components only", - collectorCfg: nil, - components: []component.Component{testComp}, - expectedKeys: []string{"receivers", "exporters", "service"}, + name: "components only", + collectorCfg: nil, + components: []component.Component{testComp}, + expectedKeys: []string{"receivers", "exporters", "service"}, + expectedLogLevel: "DEBUG", }, { - name: "both collector config and components", - collectorCfg: confmap.NewFromStringMap(testConfig), - components: []component.Component{testComp}, - expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + name: "both collector config and components", + collectorCfg: confmap.NewFromStringMap(testConfig), + components: []component.Component{testComp}, + expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + expectedLogLevel: "info", }, { name: "component config generation error", @@ -1265,6 +1271,7 @@ func TestOTelManager_buildMergedConfig(t *testing.T) { // Missing InputSpec which should cause an error during config generation }}, expectedErrorString: "failed to generate otel config: unknown otel receiver type for input type: filestream", + expectedLogLevel: "", }, } @@ -1273,6 +1280,7 @@ func TestOTelManager_buildMergedConfig(t *testing.T) { cfgUpdate := configUpdate{ collectorCfg: tt.collectorCfg, components: tt.components, + logLevel: logp.DebugLevel, } result, err := buildMergedConfig(cfgUpdate, commonAgentInfo, commonBeatMonitoringConfigGetter, logptest.NewTestingLogger(t, "")) @@ -1290,6 +1298,11 @@ func TestOTelManager_buildMergedConfig(t *testing.T) { return } + // assert log level provided by user is given precedence. + if tt.expectedLogLevel != "" { + assert.Equal(t, tt.expectedLogLevel, result.Get("service::telemetry::logs::level")) + } + require.NotNil(t, result) for _, key := range tt.expectedKeys { assert.True(t, result.IsSet(key), "Expected key %s to be set", key) From 3ebcca88a0a8c135a0d7e78a9391d4abc0eb4b73 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 31 Dec 2025 16:58:17 +0530 Subject: [PATCH 13/25] add comment --- internal/pkg/agent/application/coordinator/coordinator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 434682e441f..aa81442e973 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1675,6 +1675,7 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config c.currentCfg = currentCfg // check if log level has changed for standalone elastic-agent + // we'd have to update both the periodic and once config watchers and refactor initialization in application.go to do otherwise. if c.agentInfo.IsStandalone() { ll := currentCfg.Settings.LoggingConfig.Level if ll != c.state.LogLevel { From ceb7cdb587dfbc8cef23520dc227b07c97cf7adb Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 31 Dec 2025 17:48:55 +0530 Subject: [PATCH 14/25] fix ci --- internal/pkg/otel/manager/manager.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index a168bd30bdf..df2ff96d3b2 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -312,11 +312,10 @@ func (m *OTelManager) Run(ctx context.Context) error { m.collectorCfg = cfgUpdate.collectorCfg m.components = cfgUpdate.components // set the log level defined in service::telemetry::log::level setting - if mergedCfg.IsSet("service::telemetry::logs::level") { + if mergedCfg != nil && mergedCfg.IsSet("service::telemetry::logs::level") { m.logLevel = mergedCfg.Get("service::telemetry::logs::level").(string) // we know this always be a string. Should not panic } else { - // this condition is only true is mergedCfg is nil - // In that case, use coordinator's log level + // when ergedCfg is nil use coordinator's log level m.logLevel = cfgUpdate.logLevel.String() } m.mx.Unlock() From d511efcffcaf2017086232f7ccaf2fe2697a9b50 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 1 Jan 2026 07:22:15 +0530 Subject: [PATCH 15/25] address review comments --- internal/pkg/otel/manager/execution_subprocess.go | 10 ++++++---- internal/pkg/otel/manager/manager.go | 11 ++++++----- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index 3ed8946b114..a3302868634 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -52,9 +52,6 @@ func newSubprocessExecution(collectorPath string, uuid string, metricsPort int, collectorArgs: []string{ fmt.Sprintf("--%s", OtelSetSupervisedFlagName), fmt.Sprintf("--%s=%s", OtelSupervisedMonitoringURLFlagName, monitoring.EDOTMonitoringEndpoint()), - // edot logger here is being configured to log at debug. - // For more control: use service::telemetry::logs::level - fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, "debug"), }, healthCheckExtensionID: healthCheckExtensionID, collectorMetricsPort: metricsPort, @@ -86,6 +83,8 @@ func (r *subprocessExecution) startCollector( statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}, ) (collectorHandle, error) { + // this method unpacks the incoming logger and conditonally sets it on r.logLevel + // use r.logLevel from this point onwards if err := r.setLogLevelFromString(logLevel); err != nil { return nil, fmt.Errorf("failed to set log level for otel collector: %w", err) } @@ -131,8 +130,11 @@ func (r *subprocessExecution) startCollector( // Set the environment variable for the collector metrics port. See comment at the constant definition for more information. env = append(env, fmt.Sprintf("%s=%d", componentmonitoring.OtelCollectorMetricsPortEnvVarName, collectorMetricsPort)) + // set collector args + collectorArgs := append(r.collectorArgs, fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, r.logLevel)) + processInfo, err := process.Start(r.collectorPath, - process.WithArgs(r.collectorArgs), + process.WithArgs(collectorArgs), process.WithEnv(env), process.WithCmdOptions(func(c *exec.Cmd) error { c.Stdin = bytes.NewReader(confBytes) diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index df2ff96d3b2..1b75afafc86 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -313,9 +313,11 @@ func (m *OTelManager) Run(ctx context.Context) error { m.components = cfgUpdate.components // set the log level defined in service::telemetry::log::level setting if mergedCfg != nil && mergedCfg.IsSet("service::telemetry::logs::level") { - m.logLevel = mergedCfg.Get("service::telemetry::logs::level").(string) // we know this always be a string. Should not panic + if logLevel, ok := mergedCfg.Get("service::telemetry::logs::level").(string); ok { + m.logLevel = logLevel + } } else { - // when ergedCfg is nil use coordinator's log level + // when emrgedCfg is nil use coordinator's log level m.logLevel = cfgUpdate.logLevel.String() } m.mx.Unlock() @@ -328,7 +330,7 @@ func (m *OTelManager) Run(ctx context.Context) error { m.logger.Debugf( "new config hash (%d) is different than the old config hash (%d), applying update", m.mergedCollectorCfgHash, previousConfigHash) - applyErr := m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr, forceFetchStatusCh, m.logLevel) + applyErr := m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr, forceFetchStatusCh) // only report the error if we actually apply the update // otherwise, we could override an actual error with a nil in the channel when the collector // state doesn't actually change @@ -539,7 +541,6 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error, forceFetchStatusCh chan struct{}, - logLevel string, ) error { if m.proc != nil { m.proc.Stop(m.stopTimeout) @@ -568,7 +569,7 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, } else { // either a new configuration or the first configuration // that results in the collector being started - proc, err := m.execution.startCollector(ctx, logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh) + proc, err := m.execution.startCollector(ctx, m.logLevel, m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { // failed to create the collector (this is different then // it's failing to run). we do not retry creation on failure From 5765ff1850cf6a0903900f6486c5df2749dd91c7 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 1 Jan 2026 09:23:46 +0530 Subject: [PATCH 16/25] fix ci --- NOTICE-fips.txt | 2 +- NOTICE.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/NOTICE-fips.txt b/NOTICE-fips.txt index f058fbc9aca..ac17ad5384d 100644 --- a/NOTICE-fips.txt +++ b/NOTICE-fips.txt @@ -1,5 +1,5 @@ Elastic Beats -Copyright 2014-2025 Elasticsearch BV +Copyright 2014-2026 Elasticsearch BV This product includes software developed by The Apache Software Foundation (http://www.apache.org/). diff --git a/NOTICE.txt b/NOTICE.txt index 2e8cf6e68a0..f9326801715 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1,5 +1,5 @@ Elastic Beats -Copyright 2014-2025 Elasticsearch BV +Copyright 2014-2026 Elasticsearch BV This product includes software developed by The Apache Software Foundation (http://www.apache.org/). From f200cd4974654d868e2436f5f2ea4562eca43783 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 2 Jan 2026 15:36:04 +0530 Subject: [PATCH 17/25] add integration test --- testing/integration/ess/otel_test.go | 164 ++++++++++++++++++++++++++- 1 file changed, 162 insertions(+), 2 deletions(-) diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index ad79dcd1cda..d1076fce086 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -2,8 +2,6 @@ // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. -//go:build integration - package ess import ( @@ -23,6 +21,9 @@ import ( "time" "github.com/elastic/elastic-agent/pkg/control/v2/cproto" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" @@ -2038,3 +2039,162 @@ agent.internal.runtime.metricbeat: combinedOutput, err := fixture.Uninstall(ctx, &aTesting.UninstallOpts{Force: true}) require.NoErrorf(t, err, "error uninstalling classic agent monitoring, err: %s, combined output: %s", err, string(combinedOutput)) } + +func TestLogReloading(t *testing.T) { + define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + Stack: &define.Stack{}, + }) + + // Flow of the test + // 1. Start elastic-agent with debug logs + // 2. Change the log level to info without restarting + // 3. Ensure no debug logs are printed + // 4. Set service::telemetry::logs::level: debug + // 5. Ensure service::telemetry::logs::level is given precedence even when agent logs are set to info + + // Create the otel configuration file + type otelConfigOptions struct { + ESEndpoint string + ESApiKey string + Index string + CAFile string + } + + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx) + require.NoError(t, err) + + logConfig := ` +outputs: + default: + type: elasticsearch + hosts: + - %s + preset: balanced + protocol: http +agent.grpc.port: 6792 +agent.logging.level: %s +agent.monitoring.enabled: true +agent.logging.to_stderr: true +agent.reload: + period: 1s +` + + esURL := integration.StartMockES(t, 0, 0, 0, 0) + // start with debug logs + cfg := fmt.Sprintf(logConfig, esURL, "debug") + + require.NoError(t, fixture.Configure(ctx, []byte(cfg))) + + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + if err != nil { + t.Fatalf("cannot prepare Elastic-Agent command: %s", err) + } + + observer, zapLogs := observer.New(zap.DebugLevel) + logger := zap.New(observer) + zapWriter := &ZapWriter{logger: logger, level: zap.InfoLevel} + cmd.Stderr = zapWriter + cmd.Stdout = zapWriter + + require.NoError(t, cmd.Start()) + + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 30*time.Second, 1*time.Second) + + // Make sure the Elastic-Agent process is not running before + // exiting the test + t.Cleanup(func() { + // Ignore the error because we cancelled the context, + // and that always returns an error + _ = cmd.Wait() + if t.Failed() { + t.Log("Elastic-Agent output:") + } + }) + + require.Eventually(t, func() bool { + // we ensure OTel runtime inputs have started with correct level + // and not just agent logs + return (zapLogs.FilterMessageSnippet("otelcol.component.kind").FilterMessageSnippet(`"log.level":"debug"`).Len() > 1) + }, 1*time.Minute, 10*time.Second, "could not find debug logs") + + // set agent.logging.level: info + cfg = fmt.Sprintf(logConfig, esURL, "info") + require.NoError(t, fixture.Configure(ctx, []byte(cfg))) + + // wait for log level reload + require.Eventually(t, func() bool { + return zapLogs.FilterMessageSnippet("log level changed to info").Len() > 0 + }, 1*time.Minute, 10*time.Second, "log level did not change") + + // reset logs + zapLogs.TakeAll() + + // wait for elastic agent to be healthy and OTel collector to start + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 + }, 30*time.Second, 1*time.Second) + + require.Zero(t, zapLogs.FilterMessageSnippet(`"log.level:debug"`).Len()) + + // set collector logs to debug + logConfig = logConfig + ` +service: + telemetry: + logs: + level: debug +` + // reset zap logs + zapLogs.TakeAll() + + // add service::telemetry::logs::level:debug + cfg = fmt.Sprintf(logConfig, esURL, "info") + require.NoError(t, fixture.Configure(ctx, []byte(cfg))) + + // wait for elastic agent to be healthy and OTel collector to re-start + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 + }, 1*time.Minute, 10*time.Second, "elastic-agent is not healthy") + + require.Eventually(t, func() bool { + // we ensure inputs have reloaded with correct level + // and not just agent logs + return (zapLogs.FilterMessageSnippet("otelcol.component.kind").FilterMessageSnippet(`"log.level":"debug"`).Len() > 1) + }, 1*time.Minute, 10*time.Second, "collector setting for log level was not given precedence") +} + +type ZapWriter struct { + logger *zap.Logger + level zapcore.Level +} + +func (w *ZapWriter) Write(p []byte) (n int, err error) { + msg := strings.TrimSpace(string(p)) + if msg != "" { + w.logger.Check(w.level, msg).Write() + } + return len(p), nil +} From 1c4b57fa6a6f755eed5ecb5e89d071d8854be0ac Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 2 Jan 2026 15:36:46 +0530 Subject: [PATCH 18/25] add back build tag --- testing/integration/ess/otel_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index d1076fce086..4075e66ec78 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -2,6 +2,8 @@ // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. +//go:build integration + package ess import ( @@ -2078,7 +2080,6 @@ outputs: - %s preset: balanced protocol: http -agent.grpc.port: 6792 agent.logging.level: %s agent.monitoring.enabled: true agent.logging.to_stderr: true From 0833da4d37a4fb941d41d69accc427591a77c2c1 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 2 Jan 2026 15:43:17 +0530 Subject: [PATCH 19/25] add comment --- testing/integration/ess/otel_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 4075e66ec78..99180480aa4 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -2042,6 +2042,7 @@ agent.internal.runtime.metricbeat: require.NoErrorf(t, err, "error uninstalling classic agent monitoring, err: %s, combined output: %s", err, string(combinedOutput)) } +// This tests that live reloading the log level works correctly func TestLogReloading(t *testing.T) { define.Require(t, define.Requirements{ Group: integration.Default, From de36b8d8f5bbbf607235a4dd8ed2f81417af0bf8 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 2 Jan 2026 18:07:18 +0530 Subject: [PATCH 20/25] fix ci --- testing/integration/ess/otel_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 99180480aa4..0eab9efd0b7 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -22,7 +22,6 @@ import ( "text/template" "time" - "github.com/elastic/elastic-agent/pkg/control/v2/cproto" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -35,6 +34,7 @@ import ( "github.com/elastic/elastic-agent-libs/testing/estools" "github.com/elastic/elastic-agent-libs/transport/tlscommontest" "github.com/elastic/elastic-agent/pkg/control/v2/client" + "github.com/elastic/elastic-agent/pkg/control/v2/cproto" aTesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" @@ -2068,7 +2068,7 @@ func TestLogReloading(t *testing.T) { fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) require.NoError(t, err) - ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(10*time.Minute)) defer cancel() err = fixture.Prepare(ctx) require.NoError(t, err) @@ -2153,7 +2153,7 @@ agent.reload: return false } return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 - }, 30*time.Second, 1*time.Second) + }, 1*time.Minute, 10*time.Second, "elastic-agent was not healthy after log level changed to info") require.Zero(t, zapLogs.FilterMessageSnippet(`"log.level:debug"`).Len()) From d26711d69d1b9238bdd87f7c565588cec6e8915d Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 2 Jan 2026 20:53:51 +0530 Subject: [PATCH 21/25] fix ci --- testing/integration/ess/otel_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 0eab9efd0b7..158cabf01e0 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -2068,7 +2068,7 @@ func TestLogReloading(t *testing.T) { fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) require.NoError(t, err) - ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(10*time.Minute)) + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) defer cancel() err = fixture.Prepare(ctx) require.NoError(t, err) @@ -2082,6 +2082,7 @@ outputs: preset: balanced protocol: http agent.logging.level: %s +agent.grpc.port: 6793 agent.monitoring.enabled: true agent.logging.to_stderr: true agent.reload: @@ -2124,6 +2125,7 @@ agent.reload: _ = cmd.Wait() if t.Failed() { t.Log("Elastic-Agent output:") + zapLogs.All() } }) @@ -2152,7 +2154,6 @@ agent.reload: t.Logf("waiting for agent healthy: %s", err.Error()) return false } - return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 }, 1*time.Minute, 10*time.Second, "elastic-agent was not healthy after log level changed to info") require.Zero(t, zapLogs.FilterMessageSnippet(`"log.level:debug"`).Len()) @@ -2178,7 +2179,6 @@ service: t.Logf("waiting for agent healthy: %s", err.Error()) return false } - return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 }, 1*time.Minute, 10*time.Second, "elastic-agent is not healthy") require.Eventually(t, func() bool { From 6044a3eea62893e79763a46f2ef1f14ab8653b09 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 2 Jan 2026 21:27:32 +0530 Subject: [PATCH 22/25] fix ci --- testing/integration/ess/otel_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 158cabf01e0..674de508d1a 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -2139,11 +2139,6 @@ agent.reload: cfg = fmt.Sprintf(logConfig, esURL, "info") require.NoError(t, fixture.Configure(ctx, []byte(cfg))) - // wait for log level reload - require.Eventually(t, func() bool { - return zapLogs.FilterMessageSnippet("log level changed to info").Len() > 0 - }, 1*time.Minute, 10*time.Second, "log level did not change") - // reset logs zapLogs.TakeAll() @@ -2154,6 +2149,7 @@ agent.reload: t.Logf("waiting for agent healthy: %s", err.Error()) return false } + return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 }, 1*time.Minute, 10*time.Second, "elastic-agent was not healthy after log level changed to info") require.Zero(t, zapLogs.FilterMessageSnippet(`"log.level:debug"`).Len()) @@ -2165,13 +2161,14 @@ service: logs: level: debug ` - // reset zap logs - zapLogs.TakeAll() // add service::telemetry::logs::level:debug cfg = fmt.Sprintf(logConfig, esURL, "info") require.NoError(t, fixture.Configure(ctx, []byte(cfg))) + // reset zap logs + zapLogs.TakeAll() + // wait for elastic agent to be healthy and OTel collector to re-start require.Eventually(t, func() bool { err = fixture.IsHealthy(ctx) @@ -2179,6 +2176,7 @@ service: t.Logf("waiting for agent healthy: %s", err.Error()) return false } + return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 }, 1*time.Minute, 10*time.Second, "elastic-agent is not healthy") require.Eventually(t, func() bool { From eeca2e6524e53ab066969843f045cab4be677b6e Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Sat, 3 Jan 2026 07:05:08 +0530 Subject: [PATCH 23/25] avoid any timing issue and fix ci --- testing/integration/ess/otel_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 674de508d1a..76cdd848306 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -2135,13 +2135,13 @@ agent.reload: return (zapLogs.FilterMessageSnippet("otelcol.component.kind").FilterMessageSnippet(`"log.level":"debug"`).Len() > 1) }, 1*time.Minute, 10*time.Second, "could not find debug logs") + // reset logs + zapLogs.TakeAll() + // set agent.logging.level: info cfg = fmt.Sprintf(logConfig, esURL, "info") require.NoError(t, fixture.Configure(ctx, []byte(cfg))) - // reset logs - zapLogs.TakeAll() - // wait for elastic agent to be healthy and OTel collector to start require.Eventually(t, func() bool { err = fixture.IsHealthy(ctx) @@ -2152,7 +2152,11 @@ agent.reload: return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 }, 1*time.Minute, 10*time.Second, "elastic-agent was not healthy after log level changed to info") - require.Zero(t, zapLogs.FilterMessageSnippet(`"log.level:debug"`).Len()) + // reset logs because log level may not have taken effect between shutdown and restart + zapLogs.TakeAll() + // wait for logs from OTel inputs to populate. + time.Sleep(30 * time.Second) + require.Zero(t, zapLogs.FilterMessageSnippet(`"log.level":"debug"`).Len()) // set collector logs to debug logConfig = logConfig + ` From 4316f41567c4643b44410cc29c7117815e44918e Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 6 Jan 2026 08:18:34 +0530 Subject: [PATCH 24/25] address review comments --- internal/pkg/agent/cmd/run.go | 3 +++ .../pkg/otel/manager/execution_subprocess.go | 27 +++++-------------- internal/pkg/otel/manager/manager.go | 5 ++-- testing/integration/ess/otel_test.go | 13 ++++++--- 4 files changed, 22 insertions(+), 26 deletions(-) diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 7b5209f6bd7..71131d711c9 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -353,6 +353,9 @@ func runElasticAgent( errors.M(errors.MetaKeyPath, paths.AgentConfigFile())) } + // Set the initial log level (either default or from config file) + logger.SetLevel(logLvl) + // Ensure that the log level now matches what is configured in the agentInfo. var lvl logp.Level err = lvl.Unpack(agentInfo.LogLevel()) diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index a3302868634..09a15e4841f 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -64,7 +64,6 @@ func newSubprocessExecution(collectorPath string, uuid string, metricsPort int, type subprocessExecution struct { collectorPath string collectorArgs []string - logLevel logp.Level healthCheckExtensionID string collectorMetricsPort int collectorHealthCheckPort int @@ -83,10 +82,10 @@ func (r *subprocessExecution) startCollector( statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}, ) (collectorHandle, error) { - // this method unpacks the incoming logger and conditonally sets it on r.logLevel - // use r.logLevel from this point onwards - if err := r.setLogLevelFromString(logLevel); err != nil { - return nil, fmt.Errorf("failed to set log level for otel collector: %w", err) + var lvl logp.Level + err := lvl.Unpack(logLevel) + if err != nil { + return nil, fmt.Errorf("failed to unpack the log level '%s': %w", logLevel, err) } if cfg == nil { @@ -120,10 +119,10 @@ func (r *subprocessExecution) startCollector( } stdOutLast := newZapLast(baseLogger.Core()) - stdOut := runtimeLogger.NewLogWriterWithDefaults(stdOutLast, zapcore.Level(r.logLevel)) + stdOut := runtimeLogger.NewLogWriterWithDefaults(stdOutLast, zapcore.Level(lvl)) // info level for stdErr because by default collector writes to stderr stdErrLast := newZapLast(baseLogger.Core()) - stdErr := runtimeLogger.NewLogWriterWithDefaults(stdErrLast, zapcore.Level(r.logLevel)) + stdErr := runtimeLogger.NewLogWriterWithDefaults(stdErrLast, zapcore.Level(lvl)) procCtx, procCtxCancel := context.WithCancel(ctx) env := os.Environ() @@ -131,7 +130,7 @@ func (r *subprocessExecution) startCollector( env = append(env, fmt.Sprintf("%s=%d", componentmonitoring.OtelCollectorMetricsPortEnvVarName, collectorMetricsPort)) // set collector args - collectorArgs := append(r.collectorArgs, fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, r.logLevel)) + collectorArgs := append(r.collectorArgs, fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, lvl)) processInfo, err := process.Start(r.collectorPath, process.WithArgs(collectorArgs), @@ -259,18 +258,6 @@ func (r *subprocessExecution) startCollector( return ctl, nil } -func (r *subprocessExecution) setLogLevelFromString(logLevel string) error { - var lvl logp.Level - err := lvl.Unpack(logLevel) - if err != nil { - return fmt.Errorf("invalid log level '%s': %w", logLevel, err) - } - if r.logLevel != lvl { - r.logLevel = lvl - } - return nil -} - // cloneCollectorStatus creates a deep copy of the provided AggregateStatus. func cloneCollectorStatus(aStatus *status.AggregateStatus) *status.AggregateStatus { if aStatus == nil { diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 1b75afafc86..0117c9cc2c4 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -315,9 +315,11 @@ func (m *OTelManager) Run(ctx context.Context) error { if mergedCfg != nil && mergedCfg.IsSet("service::telemetry::logs::level") { if logLevel, ok := mergedCfg.Get("service::telemetry::logs::level").(string); ok { m.logLevel = logLevel + } else { + m.logger.Warn("failed to access log level from service::telemetry::logs::level") } } else { - // when emrgedCfg is nil use coordinator's log level + // when mergedCfg is nil use coordinator's log level m.logLevel = cfgUpdate.logLevel.String() } m.mx.Unlock() @@ -389,7 +391,6 @@ func buildMergedConfig( return nil, fmt.Errorf("failed to generate otel config: %w", err) } - // get log level from agent info level := translate.GetOTelLogLevel(cfgUpdate.logLevel.String()) if err := componentOtelCfg.Merge(confmap.NewFromStringMap(map[string]any{"service::telemetry::logs::level": level})); err != nil { return nil, fmt.Errorf("failed to set log level in otel config: %w", err) diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 76cdd848306..5dbc79c52fc 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -2142,6 +2142,14 @@ agent.reload: cfg = fmt.Sprintf(logConfig, esURL, "info") require.NoError(t, fixture.Configure(ctx, []byte(cfg))) + // wait log for collector to re-start + require.Eventually(t, func() bool { + return (zapLogs.FilterMessageSnippet("supervised collector started with pid").Len() > 0) + }, 1*time.Minute, 10*time.Second, "could not find debug logs") + + // we reset logs here because between shutdown and re-start the log level may not have taken effect + zapLogs.TakeAll() + // wait for elastic agent to be healthy and OTel collector to start require.Eventually(t, func() bool { err = fixture.IsHealthy(ctx) @@ -2152,10 +2160,7 @@ agent.reload: return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 }, 1*time.Minute, 10*time.Second, "elastic-agent was not healthy after log level changed to info") - // reset logs because log level may not have taken effect between shutdown and restart - zapLogs.TakeAll() - // wait for logs from OTel inputs to populate. - time.Sleep(30 * time.Second) + // we know for sure that between collector start -> healthy state there are debug log statements require.Zero(t, zapLogs.FilterMessageSnippet(`"log.level":"debug"`).Len()) // set collector logs to debug From 992db447d2d99e7b4d9da981f23d5f2932ab13b6 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 6 Jan 2026 11:51:05 +0530 Subject: [PATCH 25/25] use log statement --- testing/integration/ess/otel_test.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 5dbc79c52fc..e57d3757571 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -2142,14 +2142,6 @@ agent.reload: cfg = fmt.Sprintf(logConfig, esURL, "info") require.NoError(t, fixture.Configure(ctx, []byte(cfg))) - // wait log for collector to re-start - require.Eventually(t, func() bool { - return (zapLogs.FilterMessageSnippet("supervised collector started with pid").Len() > 0) - }, 1*time.Minute, 10*time.Second, "could not find debug logs") - - // we reset logs here because between shutdown and re-start the log level may not have taken effect - zapLogs.TakeAll() - // wait for elastic agent to be healthy and OTel collector to start require.Eventually(t, func() bool { err = fixture.IsHealthy(ctx) @@ -2160,8 +2152,8 @@ agent.reload: return zapLogs.FilterMessageSnippet("Everything is ready. Begin running and processing data").Len() > 0 }, 1*time.Minute, 10*time.Second, "elastic-agent was not healthy after log level changed to info") - // we know for sure that between collector start -> healthy state there are debug log statements - require.Zero(t, zapLogs.FilterMessageSnippet(`"log.level":"debug"`).Len()) + // if debug level was enabled, we would fine this message + require.Zero(t, zapLogs.FilterMessageSnippet(`Starting health check extension V2`).Len()) // set collector logs to debug logConfig = logConfig + `