From fe09c1f4a606c58a9c9c8c17400d78dbe3d7cd8b Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Thu, 12 Feb 2026 13:21:07 +0100 Subject: [PATCH 01/13] initial impl --- extension/alloyengine/extension.go | 40 ++++++++++++++++++++++++------ internal/alloycli/cmd_run.go | 6 +++++ internal/readyctx/readyctx.go | 21 ++++++++++++++++ 3 files changed, 60 insertions(+), 7 deletions(-) create mode 100644 internal/readyctx/readyctx.go diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index 98f9582573e..3aaf276661b 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" "github.com/grafana/alloy/flowcmd" + "github.com/grafana/alloy/internal/readyctx" "github.com/spf13/cobra" ) @@ -21,21 +22,27 @@ type state int var ( stateNotStarted state = 0 - stateRunning state = 1 - stateShuttingDown state = 2 - stateTerminated state = 3 + stateStarting state = 1 + stateRunning state = 2 + stateShuttingDown state = 3 + stateTerminated state = 4 + stateRunError state = 5 ) func (s state) String() string { switch s { case stateNotStarted: return "not_started" + case stateStarting: + return "starting" case stateRunning: return "running" case stateShuttingDown: return "shutting_down" case stateTerminated: return "terminated" + case stateRunError: + return "run_error" } return fmt.Sprintf("unknown_state_%d", s) } @@ -62,6 +69,15 @@ func newAlloyEngineExtension(config *Config, settings component.TelemetrySetting } } +func (e *alloyEngineExtension) isUp() int64 { + e.stateMutex.Lock() + defer e.stateMutex.Unlock() + if e.state == stateRunning { + return 1 + } + return 0 +} + // Start is called when the extension is started. func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) error { e.stateMutex.Lock() @@ -78,6 +94,13 @@ func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) e e.runCancel = runCancel e.runExited = make(chan struct{}) + runCtx = readyctx.WithOnReady(runCtx, func() { + e.stateMutex.Lock() + defer e.stateMutex.Unlock() + e.state = stateRunning + e.settings.Logger.Info("alloyengine extension started successfully") + }) + runCommand := e.runCommandFactory() runCommand.SetArgs([]string{e.config.AlloyConfig.File}) err := runCommand.ParseFlags(e.config.flagsAsSlice()) @@ -85,6 +108,9 @@ func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) e return fmt.Errorf("failed to parse flags: %w", err) } + e.state = stateStarting + e.settings.Logger.Debug("Alloy Engine Extension starting...") + go func() { defer close(e.runExited) @@ -101,9 +127,6 @@ func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) e e.settings.Logger.Warn("run command exited with an error during shutdown", zap.Error(err)) } }() - - e.state = stateRunning - e.settings.Logger.Info("alloyengine extension started successfully") return nil } @@ -120,6 +143,9 @@ func (e *alloyEngineExtension) runWithBackoffRetry(runCommand *cobra.Command, ct } lastError = err + e.stateMutex.Lock() + e.state = stateRunError + e.stateMutex.Unlock() // exponential backoff until 15s delay := 15 * time.Second @@ -176,7 +202,7 @@ func (e *alloyEngineExtension) Ready() error { defer e.stateMutex.Unlock() switch e.state { - case stateRunning: + case stateStarting, stateRunning, stateRunError: return nil default: return fmt.Errorf("alloyengine extension not ready in current state: %s", e.state.String()) diff --git a/internal/alloycli/cmd_run.go b/internal/alloycli/cmd_run.go index 013a0347bd6..a27fd230f07 100644 --- a/internal/alloycli/cmd_run.go +++ b/internal/alloycli/cmd_run.go @@ -34,6 +34,7 @@ import ( "github.com/grafana/alloy/internal/converter" convert_diag "github.com/grafana/alloy/internal/converter/diag" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/readyctx" alloy_runtime "github.com/grafana/alloy/internal/runtime" "github.com/grafana/alloy/internal/runtime/logging" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -459,6 +460,11 @@ func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error { return err } + // Signal to the caller (e.g. alloyengine extension) that the default engine is running + if fn, ok := readyctx.OnReadyFromContext(ctx); ok && fn != nil { + fn() + } + // By now, have either joined or started a new cluster. // Nodes initially join in the Viewer state. After the graph has been // loaded successfully, we can move to the Participant state to signal that diff --git a/internal/readyctx/readyctx.go b/internal/readyctx/readyctx.go new file mode 100644 index 00000000000..69a4a2128ef --- /dev/null +++ b/internal/readyctx/readyctx.go @@ -0,0 +1,21 @@ +package readyctx + +import "context" + +type ctxKey struct{} + +func WithOnReady(ctx context.Context, fn func()) context.Context { + if fn == nil { + return ctx + } + return context.WithValue(ctx, ctxKey{}, fn) +} + +func OnReadyFromContext(ctx context.Context) (fn func(), ok bool) { + v := ctx.Value(ctxKey{}) + if v == nil { + return nil, false + } + fn, ok = v.(func()) + return fn, ok +} From 81632926ced9e03f11afdf89091bda08813524bc Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Mon, 16 Feb 2026 11:48:28 +0100 Subject: [PATCH 02/13] log whenever we change state --- extension/alloyengine/extension.go | 33 ++++++++++++++++-------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index 3aaf276661b..cbac470fdef 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -29,6 +29,16 @@ var ( stateRunError state = 5 ) +func (e *alloyEngineExtension) setState(newState state) { + e.stateMutex.Lock() + defer e.stateMutex.Unlock() + oldState := e.state + e.state = newState + if oldState != newState { + e.settings.Logger.Info("alloyengine extension state changed", zap.String("from", oldState.String()), zap.String("to", newState.String())) + } +} + func (s state) String() string { switch s { case stateNotStarted: @@ -81,24 +91,21 @@ func (e *alloyEngineExtension) isUp() int64 { // Start is called when the extension is started. func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) error { e.stateMutex.Lock() - defer e.stateMutex.Unlock() - switch e.state { case stateNotStarted: break default: + e.stateMutex.Unlock() return fmt.Errorf("cannot start alloyengine extension in current state: %s", e.state.String()) } + e.stateMutex.Unlock() runCtx, runCancel := context.WithCancel(context.Background()) e.runCancel = runCancel e.runExited = make(chan struct{}) runCtx = readyctx.WithOnReady(runCtx, func() { - e.stateMutex.Lock() - defer e.stateMutex.Unlock() - e.state = stateRunning - e.settings.Logger.Info("alloyengine extension started successfully") + e.setState(stateRunning) }) runCommand := e.runCommandFactory() @@ -108,8 +115,7 @@ func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) e return fmt.Errorf("failed to parse flags: %w", err) } - e.state = stateStarting - e.settings.Logger.Debug("Alloy Engine Extension starting...") + e.setState(stateStarting) go func() { defer close(e.runExited) @@ -118,8 +124,8 @@ func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) e e.stateMutex.Lock() previousState := e.state - e.state = stateTerminated e.stateMutex.Unlock() + e.setState(stateTerminated) if err == nil { e.settings.Logger.Debug("run command exited successfully without error") @@ -143,9 +149,7 @@ func (e *alloyEngineExtension) runWithBackoffRetry(runCommand *cobra.Command, ct } lastError = err - e.stateMutex.Lock() - e.state = stateRunError - e.stateMutex.Unlock() + e.setState(stateRunError) // exponential backoff until 15s delay := 15 * time.Second @@ -172,11 +176,10 @@ func (e *alloyEngineExtension) Shutdown(ctx context.Context) error { switch e.state { case stateRunning: e.settings.Logger.Info("alloyengine extension shutting down") - e.state = stateShuttingDown + e.stateMutex.Unlock() + e.setState(stateShuttingDown) // guaranteed to be non-nil since we are in stateRunning e.runCancel() - // unlock so that the run goroutine can transition to terminated - e.stateMutex.Unlock() select { case <-e.runExited: From f2be64c6f3441f36031a26d7d6e54078e17c38c6 Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Mon, 16 Feb 2026 12:43:10 +0100 Subject: [PATCH 03/13] Ensure that tests account for ctx callback --- extension/alloyengine/extension_test.go | 35 ++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/extension/alloyengine/extension_test.go b/extension/alloyengine/extension_test.go index 6cfec0e5464..63eb0945728 100644 --- a/extension/alloyengine/extension_test.go +++ b/extension/alloyengine/extension_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/grafana/alloy/internal/readyctx" "github.com/spf13/cobra" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -33,6 +34,19 @@ func newTestExtension(t *testing.T, factory func() *cobra.Command, config *Confi // blockingCommand returns a cobra command that blocks until the context is cancelled, then returns nil. func blockingCommand() *cobra.Command { + return &cobra.Command{ + RunE: func(cmd *cobra.Command, args []string) error { + if fn, ok := readyctx.OnReadyFromContext(cmd.Context()); ok && fn != nil { + fn() + } + <-cmd.Context().Done() + return nil + }, + } +} + +// blockingCommandWithoutReady blocks until context cancellation but never calls the ready callback. +func blockingCommandWithoutReady() *cobra.Command { return &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { <-cmd.Context().Done() @@ -45,6 +59,9 @@ func blockingCommand() *cobra.Command { func shutdownErrorCommand(err error) *cobra.Command { return &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { + if fn, ok := readyctx.OnReadyFromContext(cmd.Context()); ok && fn != nil { + fn() + } <-cmd.Context().Done() return err }, @@ -98,7 +115,7 @@ func TestLifecycle_SuccessfulStartAndShutdown(t *testing.T) { host := componenttest.NewNopHost() require.NoError(t, e.Start(ctx, host)) - require.Equal(t, stateRunning, e.state) + require.Eventually(t, func() bool { return e.state == stateRunning }, 1*time.Second, 25*time.Millisecond, "extension did not reach stateRunning") require.NoError(t, e.Ready()) require.NoError(t, e.NotReady()) @@ -132,11 +149,27 @@ func TestReadyWhenNotStarted(t *testing.T) { require.Error(t, e.NotReady()) } +func TestStayInStartingWhenReadyNotCalled(t *testing.T) { + e := newTestExtension(t, blockingCommandWithoutReady, defaultTestConfig()) + require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost())) + + // Give the run goroutine time to start and block (without calling ready). + time.Sleep(50 * time.Millisecond) + + // We should still be in stateStarting since the ready callback was never invoked. + require.Equal(t, stateStarting, e.state) + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + t.Cleanup(cancel) + require.NoError(t, e.Shutdown(shutdownCtx)) +} + func TestShutdownWithRunCommandError(t *testing.T) { expected := errors.New("shutdown error") e := newTestExtension(t, func() *cobra.Command { return shutdownErrorCommand(expected) }, defaultTestConfig()) require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost())) + require.Eventually(t, func() bool { return e.state == stateRunning }, 1*time.Second, 25*time.Millisecond, "extension did not reach stateRunning") shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) t.Cleanup(cancel) From 0f3d88445cc3b15561c9ba65b9f537092687513b Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Mon, 16 Feb 2026 12:45:39 +0100 Subject: [PATCH 04/13] Clean up test names --- extension/alloyengine/extension.go | 1 - extension/alloyengine/extension_test.go | 10 +++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index cbac470fdef..010dfc47b37 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -88,7 +88,6 @@ func (e *alloyEngineExtension) isUp() int64 { return 0 } -// Start is called when the extension is started. func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) error { e.stateMutex.Lock() switch e.state { diff --git a/extension/alloyengine/extension_test.go b/extension/alloyengine/extension_test.go index 63eb0945728..aa0dcd70fc6 100644 --- a/extension/alloyengine/extension_test.go +++ b/extension/alloyengine/extension_test.go @@ -136,20 +136,20 @@ func TestLifecycle_SuccessfulStartAndShutdown(t *testing.T) { require.Equal(t, stateTerminated, e.state) } -func TestStartTwiceFails(t *testing.T) { +func TestLifecycle_StartTwiceFails(t *testing.T) { e := newTestExtension(t, blockingCommand, defaultTestConfig()) require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost())) err := e.Start(context.Background(), componenttest.NewNopHost()) require.Error(t, err) } -func TestReadyWhenNotStarted(t *testing.T) { +func TestLifecycle_NotReadyWhenNotStarted(t *testing.T) { e := newTestExtension(t, blockingCommand, defaultTestConfig()) require.Error(t, e.Ready()) require.Error(t, e.NotReady()) } -func TestStayInStartingWhenReadyNotCalled(t *testing.T) { +func TestLifecycle_StayInStartingWhenReadyNotCalled(t *testing.T) { e := newTestExtension(t, blockingCommandWithoutReady, defaultTestConfig()) require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost())) @@ -164,7 +164,7 @@ func TestStayInStartingWhenReadyNotCalled(t *testing.T) { require.NoError(t, e.Shutdown(shutdownCtx)) } -func TestShutdownWithRunCommandError(t *testing.T) { +func TestLifecycle_ShutdownWithRunCommandError(t *testing.T) { expected := errors.New("shutdown error") e := newTestExtension(t, func() *cobra.Command { return shutdownErrorCommand(expected) }, defaultTestConfig()) @@ -187,7 +187,7 @@ func TestShutdownWithRunCommandError(t *testing.T) { require.Equal(t, stateTerminated, e.state) } -func Test_RunSucceedsAfterRetries(t *testing.T) { +func TestLifecycle__RunSucceedsAfterRetries(t *testing.T) { testErr := errors.New("temporary failure") factory, state := newRetryTrackingCommand(2, testErr) // Fail 2 times, succeed on 3rd attempt cfg := defaultTestConfig() From 1dc9b8e4f50089374f4580a68593ec0e80a0a51f Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Tue, 17 Feb 2026 13:04:09 +0100 Subject: [PATCH 05/13] Ensure NotReady() reflects Ready() to avoid confusion --- extension/alloyengine/extension.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index 010dfc47b37..91a9fc1fd31 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -217,7 +217,7 @@ func (e *alloyEngineExtension) NotReady() error { defer e.stateMutex.Unlock() switch e.state { - case stateRunning: + case stateStarting, stateRunning, stateRunError: return nil default: return fmt.Errorf("alloyengine extension not ready in current state: %s", e.state.String()) From afbab7b79faa02408ab07734de089cf36e27d2bf Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Tue, 17 Feb 2026 13:13:29 +0100 Subject: [PATCH 06/13] Clean up state management locking --- extension/alloyengine/extension.go | 41 +++++++++++++----------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index 91a9fc1fd31..95dcfab9b14 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -39,6 +39,12 @@ func (e *alloyEngineExtension) setState(newState state) { } } +func (e *alloyEngineExtension) getState() state { + e.stateMutex.Lock() + defer e.stateMutex.Unlock() + return e.state +} + func (s state) String() string { switch s { case stateNotStarted: @@ -89,15 +95,13 @@ func (e *alloyEngineExtension) isUp() int64 { } func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) error { - e.stateMutex.Lock() - switch e.state { + currentState := e.getState() + switch currentState { case stateNotStarted: break default: - e.stateMutex.Unlock() - return fmt.Errorf("cannot start alloyengine extension in current state: %s", e.state.String()) + return fmt.Errorf("cannot start alloyengine extension in current state: %s", currentState) } - e.stateMutex.Unlock() runCtx, runCancel := context.WithCancel(context.Background()) e.runCancel = runCancel @@ -121,9 +125,7 @@ func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) e err := e.runWithBackoffRetry(runCommand, runCtx) - e.stateMutex.Lock() - previousState := e.state - e.stateMutex.Unlock() + previousState := currentState e.setState(stateTerminated) if err == nil { @@ -171,11 +173,10 @@ func (e *alloyEngineExtension) runWithBackoffRetry(runCommand *cobra.Command, ct // Shutdown is called when the extension is being stopped. func (e *alloyEngineExtension) Shutdown(ctx context.Context) error { - e.stateMutex.Lock() - switch e.state { + currentState := e.getState() + switch currentState { case stateRunning: e.settings.Logger.Info("alloyengine extension shutting down") - e.stateMutex.Unlock() e.setState(stateShuttingDown) // guaranteed to be non-nil since we are in stateRunning e.runCancel() @@ -189,37 +190,31 @@ func (e *alloyEngineExtension) Shutdown(ctx context.Context) error { return nil case stateNotStarted: e.settings.Logger.Info("alloyengine extension shutdown completed (not started)") - e.stateMutex.Unlock() return nil default: e.settings.Logger.Warn("alloyengine extension shutdown in current state is a no-op", zap.String("state", e.state.String())) - e.stateMutex.Unlock() return nil } } // Ready returns nil when the extension is ready to process data. func (e *alloyEngineExtension) Ready() error { - e.stateMutex.Lock() - defer e.stateMutex.Unlock() - - switch e.state { + currentState := e.getState() + switch currentState { case stateStarting, stateRunning, stateRunError: return nil default: - return fmt.Errorf("alloyengine extension not ready in current state: %s", e.state.String()) + return fmt.Errorf("alloyengine extension not ready in current state: %s", currentState.String()) } } // NotReady returns an error when the extension is not ready to process data. func (e *alloyEngineExtension) NotReady() error { - e.stateMutex.Lock() - defer e.stateMutex.Unlock() - - switch e.state { + currentState := e.getState() + switch currentState { case stateStarting, stateRunning, stateRunError: return nil default: - return fmt.Errorf("alloyengine extension not ready in current state: %s", e.state.String()) + return fmt.Errorf("alloyengine extension not ready in current state: %s", currentState.String()) } } From a64a9a18cfea55f065ffbce2611a10aab18dbe93 Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Tue, 17 Feb 2026 13:14:49 +0100 Subject: [PATCH 07/13] Remove unused IsUp function Leftover from doing metrics exploration --- extension/alloyengine/extension.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index 95dcfab9b14..88d9ee03e4d 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -85,15 +85,6 @@ func newAlloyEngineExtension(config *Config, settings component.TelemetrySetting } } -func (e *alloyEngineExtension) isUp() int64 { - e.stateMutex.Lock() - defer e.stateMutex.Unlock() - if e.state == stateRunning { - return 1 - } - return 0 -} - func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) error { currentState := e.getState() switch currentState { From c7f0997280d7d381680395a36cfe62839add9220 Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Wed, 18 Feb 2026 10:08:25 +0100 Subject: [PATCH 08/13] Ensure all states are handled in Shutdown() --- extension/alloyengine/extension.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index 88d9ee03e4d..77c61847cd7 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -166,7 +166,7 @@ func (e *alloyEngineExtension) runWithBackoffRetry(runCommand *cobra.Command, ct func (e *alloyEngineExtension) Shutdown(ctx context.Context) error { currentState := e.getState() switch currentState { - case stateRunning: + case stateStarting, stateRunning, stateRunError: e.settings.Logger.Info("alloyengine extension shutting down") e.setState(stateShuttingDown) // guaranteed to be non-nil since we are in stateRunning From 14a8c6d963be8c678271fc09999b09485db3ce9f Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Wed, 18 Feb 2026 10:12:37 +0100 Subject: [PATCH 09/13] Always use GetState() --- extension/alloyengine/extension.go | 2 +- extension/alloyengine/extension_test.go | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index 77c61847cd7..cf63af7a585 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -183,7 +183,7 @@ func (e *alloyEngineExtension) Shutdown(ctx context.Context) error { e.settings.Logger.Info("alloyengine extension shutdown completed (not started)") return nil default: - e.settings.Logger.Warn("alloyengine extension shutdown in current state is a no-op", zap.String("state", e.state.String())) + e.settings.Logger.Warn("alloyengine extension shutdown in current state is a no-op", zap.String("state", e.getState().String())) return nil } } diff --git a/extension/alloyengine/extension_test.go b/extension/alloyengine/extension_test.go index aa0dcd70fc6..a8f67a7f483 100644 --- a/extension/alloyengine/extension_test.go +++ b/extension/alloyengine/extension_test.go @@ -115,7 +115,7 @@ func TestLifecycle_SuccessfulStartAndShutdown(t *testing.T) { host := componenttest.NewNopHost() require.NoError(t, e.Start(ctx, host)) - require.Eventually(t, func() bool { return e.state == stateRunning }, 1*time.Second, 25*time.Millisecond, "extension did not reach stateRunning") + require.Eventually(t, func() bool { return e.getState() == stateRunning }, 1*time.Second, 25*time.Millisecond, "extension did not reach stateRunning") require.NoError(t, e.Ready()) require.NoError(t, e.NotReady()) @@ -133,7 +133,7 @@ func TestLifecycle_SuccessfulStartAndShutdown(t *testing.T) { return false } }, 1*time.Second, 25*time.Millisecond, "run command did not exit in time") - require.Equal(t, stateTerminated, e.state) + require.Equal(t, stateTerminated, e.getState()) } func TestLifecycle_StartTwiceFails(t *testing.T) { @@ -157,7 +157,7 @@ func TestLifecycle_StayInStartingWhenReadyNotCalled(t *testing.T) { time.Sleep(50 * time.Millisecond) // We should still be in stateStarting since the ready callback was never invoked. - require.Equal(t, stateStarting, e.state) + require.Equal(t, stateStarting, e.getState()) shutdownCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) t.Cleanup(cancel) @@ -169,7 +169,7 @@ func TestLifecycle_ShutdownWithRunCommandError(t *testing.T) { e := newTestExtension(t, func() *cobra.Command { return shutdownErrorCommand(expected) }, defaultTestConfig()) require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost())) - require.Eventually(t, func() bool { return e.state == stateRunning }, 1*time.Second, 25*time.Millisecond, "extension did not reach stateRunning") + require.Eventually(t, func() bool { return e.getState() == stateRunning }, 1*time.Second, 25*time.Millisecond, "extension did not reach stateRunning") shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) t.Cleanup(cancel) @@ -184,10 +184,10 @@ func TestLifecycle_ShutdownWithRunCommandError(t *testing.T) { return false } }, 1*time.Second, 25*time.Millisecond, "run command did not exit in time") - require.Equal(t, stateTerminated, e.state) + require.Equal(t, stateTerminated, e.getState()) } -func TestLifecycle__RunSucceedsAfterRetries(t *testing.T) { +func TestLifecycle_RunSucceedsAfterRetries(t *testing.T) { testErr := errors.New("temporary failure") factory, state := newRetryTrackingCommand(2, testErr) // Fail 2 times, succeed on 3rd attempt cfg := defaultTestConfig() @@ -208,5 +208,5 @@ func TestLifecycle__RunSucceedsAfterRetries(t *testing.T) { // Verify it succeeded after 3 attempts (2 failures + 1 success) require.Equal(t, 3, state.attempts) require.Equal(t, 3, state.succeededAt) - require.Equal(t, stateTerminated, e.state) + require.Equal(t, stateTerminated, e.getState()) } From 902d6b41123a145df2c93bc59b8237f86d81ec57 Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Wed, 18 Feb 2026 11:24:43 +0100 Subject: [PATCH 10/13] use iota --- extension/alloyengine/extension.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index cf63af7a585..5e0dedabbfd 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -20,13 +20,13 @@ var _ extension.Extension = (*alloyEngineExtension)(nil) type state int -var ( - stateNotStarted state = 0 - stateStarting state = 1 - stateRunning state = 2 - stateShuttingDown state = 3 - stateTerminated state = 4 - stateRunError state = 5 +const ( + stateNotStarted state = iota + stateStarting + stateRunning + stateShuttingDown + stateTerminated + stateRunError ) func (e *alloyEngineExtension) setState(newState state) { From 7721d67625ddb30fb4db634a88c8d9e4554ed0df Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Wed, 18 Feb 2026 11:26:24 +0100 Subject: [PATCH 11/13] fix issue with re-using currentState within spawned goroutine --- extension/alloyengine/extension.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index 5e0dedabbfd..defc050c176 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -116,7 +116,7 @@ func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) e err := e.runWithBackoffRetry(runCommand, runCtx) - previousState := currentState + previousState := e.getState() e.setState(stateTerminated) if err == nil { From 968152d621c73a4cc1d941a9ebab21f9ce83b3ce Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Wed, 18 Feb 2026 11:28:46 +0100 Subject: [PATCH 12/13] Update comment to be clearer --- extension/alloyengine/extension.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index defc050c176..e5cad28dbb6 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -169,7 +169,7 @@ func (e *alloyEngineExtension) Shutdown(ctx context.Context) error { case stateStarting, stateRunning, stateRunError: e.settings.Logger.Info("alloyengine extension shutting down") e.setState(stateShuttingDown) - // guaranteed to be non-nil since we are in stateRunning + // guaranteed to be non-nil since runCancel is set in Start() e.runCancel() select { From dc6699768b0027091b9606d3dec38d3be41773e8 Mon Sep 17 00:00:00 2001 From: Bejal Lewis <164711649+blewis12@users.noreply.github.com> Date: Fri, 20 Feb 2026 17:36:05 +0100 Subject: [PATCH 13/13] only allocate runCtx, runCancel and runExited after flag parse succeeds --- extension/alloyengine/extension.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index e5cad28dbb6..60296439a3f 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -94,6 +94,13 @@ func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) e return fmt.Errorf("cannot start alloyengine extension in current state: %s", currentState) } + runCommand := e.runCommandFactory() + runCommand.SetArgs([]string{e.config.AlloyConfig.File}) + err := runCommand.ParseFlags(e.config.flagsAsSlice()) + if err != nil { + return fmt.Errorf("failed to parse flags: %w", err) + } + runCtx, runCancel := context.WithCancel(context.Background()) e.runCancel = runCancel e.runExited = make(chan struct{}) @@ -102,13 +109,6 @@ func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) e e.setState(stateRunning) }) - runCommand := e.runCommandFactory() - runCommand.SetArgs([]string{e.config.AlloyConfig.File}) - err := runCommand.ParseFlags(e.config.flagsAsSlice()) - if err != nil { - return fmt.Errorf("failed to parse flags: %w", err) - } - e.setState(stateStarting) go func() {