diff --git a/extension/alloyengine/extension.go b/extension/alloyengine/extension.go index 98f9582573e..60296439a3f 100644 --- a/extension/alloyengine/extension.go +++ b/extension/alloyengine/extension.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" "github.com/grafana/alloy/flowcmd" + "github.com/grafana/alloy/internal/readyctx" "github.com/spf13/cobra" ) @@ -19,23 +20,45 @@ var _ extension.Extension = (*alloyEngineExtension)(nil) type state int -var ( - stateNotStarted state = 0 - stateRunning state = 1 - stateShuttingDown state = 2 - stateTerminated state = 3 +const ( + stateNotStarted state = iota + stateStarting + stateRunning + stateShuttingDown + stateTerminated + stateRunError ) +func (e *alloyEngineExtension) setState(newState state) { + e.stateMutex.Lock() + defer e.stateMutex.Unlock() + oldState := e.state + e.state = newState + if oldState != newState { + e.settings.Logger.Info("alloyengine extension state changed", zap.String("from", oldState.String()), zap.String("to", newState.String())) + } +} + +func (e *alloyEngineExtension) getState() state { + e.stateMutex.Lock() + defer e.stateMutex.Unlock() + return e.state +} + func (s state) String() string { switch s { case stateNotStarted: return "not_started" + case stateStarting: + return "starting" case stateRunning: return "running" case stateShuttingDown: return "shutting_down" case stateTerminated: return "terminated" + case stateRunError: + return "run_error" } return fmt.Sprintf("unknown_state_%d", s) } @@ -62,22 +85,15 @@ func newAlloyEngineExtension(config *Config, settings component.TelemetrySetting } } -// Start is called when the extension is started. func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) error { - e.stateMutex.Lock() - defer e.stateMutex.Unlock() - - switch e.state { + currentState := e.getState() + switch currentState { case stateNotStarted: break default: - return fmt.Errorf("cannot start alloyengine extension in current state: %s", e.state.String()) + return fmt.Errorf("cannot start alloyengine extension in current state: %s", currentState) } - runCtx, runCancel := context.WithCancel(context.Background()) - e.runCancel = runCancel - e.runExited = make(chan struct{}) - runCommand := e.runCommandFactory() runCommand.SetArgs([]string{e.config.AlloyConfig.File}) err := runCommand.ParseFlags(e.config.flagsAsSlice()) @@ -85,15 +101,23 @@ func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) e return fmt.Errorf("failed to parse flags: %w", err) } + runCtx, runCancel := context.WithCancel(context.Background()) + e.runCancel = runCancel + e.runExited = make(chan struct{}) + + runCtx = readyctx.WithOnReady(runCtx, func() { + e.setState(stateRunning) + }) + + e.setState(stateStarting) + go func() { defer close(e.runExited) err := e.runWithBackoffRetry(runCommand, runCtx) - e.stateMutex.Lock() - previousState := e.state - e.state = stateTerminated - e.stateMutex.Unlock() + previousState := e.getState() + e.setState(stateTerminated) if err == nil { e.settings.Logger.Debug("run command exited successfully without error") @@ -101,9 +125,6 @@ func (e *alloyEngineExtension) Start(ctx context.Context, host component.Host) e e.settings.Logger.Warn("run command exited with an error during shutdown", zap.Error(err)) } }() - - e.state = stateRunning - e.settings.Logger.Info("alloyengine extension started successfully") return nil } @@ -120,6 +141,7 @@ func (e *alloyEngineExtension) runWithBackoffRetry(runCommand *cobra.Command, ct } lastError = err + e.setState(stateRunError) // exponential backoff until 15s delay := 15 * time.Second @@ -142,15 +164,13 @@ func (e *alloyEngineExtension) runWithBackoffRetry(runCommand *cobra.Command, ct // Shutdown is called when the extension is being stopped. func (e *alloyEngineExtension) Shutdown(ctx context.Context) error { - e.stateMutex.Lock() - switch e.state { - case stateRunning: + currentState := e.getState() + switch currentState { + case stateStarting, stateRunning, stateRunError: e.settings.Logger.Info("alloyengine extension shutting down") - e.state = stateShuttingDown - // guaranteed to be non-nil since we are in stateRunning + e.setState(stateShuttingDown) + // guaranteed to be non-nil since runCancel is set in Start() e.runCancel() - // unlock so that the run goroutine can transition to terminated - e.stateMutex.Unlock() select { case <-e.runExited: @@ -161,37 +181,31 @@ func (e *alloyEngineExtension) Shutdown(ctx context.Context) error { return nil case stateNotStarted: e.settings.Logger.Info("alloyengine extension shutdown completed (not started)") - e.stateMutex.Unlock() return nil default: - e.settings.Logger.Warn("alloyengine extension shutdown in current state is a no-op", zap.String("state", e.state.String())) - e.stateMutex.Unlock() + e.settings.Logger.Warn("alloyengine extension shutdown in current state is a no-op", zap.String("state", e.getState().String())) return nil } } // Ready returns nil when the extension is ready to process data. func (e *alloyEngineExtension) Ready() error { - e.stateMutex.Lock() - defer e.stateMutex.Unlock() - - switch e.state { - case stateRunning: + currentState := e.getState() + switch currentState { + case stateStarting, stateRunning, stateRunError: return nil default: - return fmt.Errorf("alloyengine extension not ready in current state: %s", e.state.String()) + return fmt.Errorf("alloyengine extension not ready in current state: %s", currentState.String()) } } // NotReady returns an error when the extension is not ready to process data. func (e *alloyEngineExtension) NotReady() error { - e.stateMutex.Lock() - defer e.stateMutex.Unlock() - - switch e.state { - case stateRunning: + currentState := e.getState() + switch currentState { + case stateStarting, stateRunning, stateRunError: return nil default: - return fmt.Errorf("alloyengine extension not ready in current state: %s", e.state.String()) + return fmt.Errorf("alloyengine extension not ready in current state: %s", currentState.String()) } } diff --git a/extension/alloyengine/extension_test.go b/extension/alloyengine/extension_test.go index 6cfec0e5464..a8f67a7f483 100644 --- a/extension/alloyengine/extension_test.go +++ b/extension/alloyengine/extension_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/grafana/alloy/internal/readyctx" "github.com/spf13/cobra" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -33,6 +34,19 @@ func newTestExtension(t *testing.T, factory func() *cobra.Command, config *Confi // blockingCommand returns a cobra command that blocks until the context is cancelled, then returns nil. func blockingCommand() *cobra.Command { + return &cobra.Command{ + RunE: func(cmd *cobra.Command, args []string) error { + if fn, ok := readyctx.OnReadyFromContext(cmd.Context()); ok && fn != nil { + fn() + } + <-cmd.Context().Done() + return nil + }, + } +} + +// blockingCommandWithoutReady blocks until context cancellation but never calls the ready callback. +func blockingCommandWithoutReady() *cobra.Command { return &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { <-cmd.Context().Done() @@ -45,6 +59,9 @@ func blockingCommand() *cobra.Command { func shutdownErrorCommand(err error) *cobra.Command { return &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { + if fn, ok := readyctx.OnReadyFromContext(cmd.Context()); ok && fn != nil { + fn() + } <-cmd.Context().Done() return err }, @@ -98,7 +115,7 @@ func TestLifecycle_SuccessfulStartAndShutdown(t *testing.T) { host := componenttest.NewNopHost() require.NoError(t, e.Start(ctx, host)) - require.Equal(t, stateRunning, e.state) + require.Eventually(t, func() bool { return e.getState() == stateRunning }, 1*time.Second, 25*time.Millisecond, "extension did not reach stateRunning") require.NoError(t, e.Ready()) require.NoError(t, e.NotReady()) @@ -116,27 +133,43 @@ func TestLifecycle_SuccessfulStartAndShutdown(t *testing.T) { return false } }, 1*time.Second, 25*time.Millisecond, "run command did not exit in time") - require.Equal(t, stateTerminated, e.state) + require.Equal(t, stateTerminated, e.getState()) } -func TestStartTwiceFails(t *testing.T) { +func TestLifecycle_StartTwiceFails(t *testing.T) { e := newTestExtension(t, blockingCommand, defaultTestConfig()) require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost())) err := e.Start(context.Background(), componenttest.NewNopHost()) require.Error(t, err) } -func TestReadyWhenNotStarted(t *testing.T) { +func TestLifecycle_NotReadyWhenNotStarted(t *testing.T) { e := newTestExtension(t, blockingCommand, defaultTestConfig()) require.Error(t, e.Ready()) require.Error(t, e.NotReady()) } -func TestShutdownWithRunCommandError(t *testing.T) { +func TestLifecycle_StayInStartingWhenReadyNotCalled(t *testing.T) { + e := newTestExtension(t, blockingCommandWithoutReady, defaultTestConfig()) + require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost())) + + // Give the run goroutine time to start and block (without calling ready). + time.Sleep(50 * time.Millisecond) + + // We should still be in stateStarting since the ready callback was never invoked. + require.Equal(t, stateStarting, e.getState()) + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + t.Cleanup(cancel) + require.NoError(t, e.Shutdown(shutdownCtx)) +} + +func TestLifecycle_ShutdownWithRunCommandError(t *testing.T) { expected := errors.New("shutdown error") e := newTestExtension(t, func() *cobra.Command { return shutdownErrorCommand(expected) }, defaultTestConfig()) require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost())) + require.Eventually(t, func() bool { return e.getState() == stateRunning }, 1*time.Second, 25*time.Millisecond, "extension did not reach stateRunning") shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) t.Cleanup(cancel) @@ -151,10 +184,10 @@ func TestShutdownWithRunCommandError(t *testing.T) { return false } }, 1*time.Second, 25*time.Millisecond, "run command did not exit in time") - require.Equal(t, stateTerminated, e.state) + require.Equal(t, stateTerminated, e.getState()) } -func Test_RunSucceedsAfterRetries(t *testing.T) { +func TestLifecycle_RunSucceedsAfterRetries(t *testing.T) { testErr := errors.New("temporary failure") factory, state := newRetryTrackingCommand(2, testErr) // Fail 2 times, succeed on 3rd attempt cfg := defaultTestConfig() @@ -175,5 +208,5 @@ func Test_RunSucceedsAfterRetries(t *testing.T) { // Verify it succeeded after 3 attempts (2 failures + 1 success) require.Equal(t, 3, state.attempts) require.Equal(t, 3, state.succeededAt) - require.Equal(t, stateTerminated, e.state) + require.Equal(t, stateTerminated, e.getState()) } diff --git a/internal/alloycli/cmd_run.go b/internal/alloycli/cmd_run.go index aa773da8b06..217c81b962d 100644 --- a/internal/alloycli/cmd_run.go +++ b/internal/alloycli/cmd_run.go @@ -34,6 +34,7 @@ import ( "github.com/grafana/alloy/internal/converter" convert_diag "github.com/grafana/alloy/internal/converter/diag" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/readyctx" alloy_runtime "github.com/grafana/alloy/internal/runtime" "github.com/grafana/alloy/internal/runtime/logging" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -462,6 +463,11 @@ func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error { return err } + // Signal to the caller (e.g. alloyengine extension) that the default engine is running + if fn, ok := readyctx.OnReadyFromContext(ctx); ok && fn != nil { + fn() + } + // By now, have either joined or started a new cluster. // Nodes initially join in the Viewer state. After the graph has been // loaded successfully, we can move to the Participant state to signal that diff --git a/internal/readyctx/readyctx.go b/internal/readyctx/readyctx.go new file mode 100644 index 00000000000..69a4a2128ef --- /dev/null +++ b/internal/readyctx/readyctx.go @@ -0,0 +1,21 @@ +package readyctx + +import "context" + +type ctxKey struct{} + +func WithOnReady(ctx context.Context, fn func()) context.Context { + if fn == nil { + return ctx + } + return context.WithValue(ctx, ctxKey{}, fn) +} + +func OnReadyFromContext(ctx context.Context) (fn func(), ok bool) { + v := ctx.Value(ctxKey{}) + if v == nil { + return nil, false + } + fn, ok = v.(func()) + return fn, ok +}