diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 38069df44156a..4265c98e1e591 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -814,7 +814,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (as *Server, err error) { return nil, trace.Wrap(err) } - as.botVersionReporter, err = machineidv1.NewAutoUpdateVersionReporter(machineidv1.AutoUpdateVersionReporterConfig{ + as.BotInstanceVersionReporter, err = machineidv1.NewAutoUpdateVersionReporter(machineidv1.AutoUpdateVersionReporterConfig{ Clock: cfg.Clock, Logger: as.logger.With( teleport.ComponentKey, @@ -828,9 +828,6 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (as *Server, err error) { if err != nil { return nil, trace.Wrap(err) } - if err := as.botVersionReporter.Run(as.CloseContext()); err != nil { - return nil, trace.Wrap(err) - } if _, ok := as.getCache(); !ok { as.logger.WarnContext(closeCtx, "Auth server starting without cache (may have negative performance implications)") @@ -1340,9 +1337,9 @@ type Server struct { // plugin. The summarizer itself summarizes session recordings. sessionSummarizerProvider *summarizer.SessionSummarizerProvider - // botVersionReporter is called periodically to generate a report of the - // number of bot instances by version and update group. - botVersionReporter *machineidv1.AutoUpdateVersionReporter + // BotInstanceVersionReporter is called periodically to generate a report of + // the number of bot instances by version and update group. + BotInstanceVersionReporter *machineidv1.AutoUpdateVersionReporter } // SetSAMLService registers svc as the SAMLService that provides the SAML @@ -1858,7 +1855,7 @@ func (a *Server) runPeriodicOperations() { case autoUpdateAgentReportKey: go a.reportAgentVersions(a.closeCtx) case autoUpdateBotInstanceReportKey: - go a.botVersionReporter.Report(a.closeCtx) + go a.BotInstanceVersionReporter.Report(a.closeCtx) case autoUpdateBotInstanceMetricsKey: go a.updateBotInstanceMetrics() } diff --git a/lib/auth/machineid/machineidv1/auto_update_version_reporter.go b/lib/auth/machineid/machineidv1/auto_update_version_reporter.go index 3c1c608d5018a..756620b3494db 100644 --- a/lib/auth/machineid/machineidv1/auto_update_version_reporter.go +++ b/lib/auth/machineid/machineidv1/auto_update_version_reporter.go @@ -130,36 +130,33 @@ func (r *AutoUpdateVersionReporter) Run(ctx context.Context) error { return trace.Wrap(err) } - go func() { - defer r.logger.DebugContext(ctx, "Shutting down") + defer r.logger.DebugContext(ctx, "Shutting down") - for { - started := r.clock.Now() - r.runLeader(ctx) - leaderFor := r.clock.Now().Sub(started) + for { + started := r.clock.Now() + r.runLeader(ctx) + leaderFor := r.clock.Since(started) - // Context is done, exit immediately. - if ctx.Err() != nil { - return - } + // Context is done, exit immediately. + if ctx.Err() != nil { + return nil + } - // If we were leader for a decent amount of time, any previous - // backoff likely doesn't apply anymore. - if leaderFor > 5*time.Minute { - retry.Reset() - } + // If we were leader for a decent amount of time, any previous + // backoff likely doesn't apply anymore. + if leaderFor > 5*time.Minute { + retry.Reset() + } - // Wait for the next retry interval. - retry.Inc() + // Wait for the next retry interval. + retry.Inc() - select { - case <-retry.After(): - case <-ctx.Done(): - return - } + select { + case <-retry.After(): + case <-ctx.Done(): + return nil } - }() - return nil + } } func (r *AutoUpdateVersionReporter) runLeader(ctx context.Context) error { diff --git a/lib/auth/machineid/machineidv1/auto_update_version_reporter_test.go b/lib/auth/machineid/machineidv1/auto_update_version_reporter_test.go index 6bb33ca5ff50a..c87314315c46f 100644 --- a/lib/auth/machineid/machineidv1/auto_update_version_reporter_test.go +++ b/lib/auth/machineid/machineidv1/auto_update_version_reporter_test.go @@ -48,7 +48,9 @@ import ( func TestAutoUpdateVersionReporter(t *testing.T) { t.Parallel() - ctx := t.Context() + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + clock := clockwork.NewFakeClockAt(time.Now().UTC()) backend, err := memory.New(memory.Config{ @@ -105,7 +107,9 @@ func TestAutoUpdateVersionReporter(t *testing.T) { require.NoError(t, err) // Run the leader election process. Wait for the semaphore to be acquired. - require.NoError(t, reporter.Run(ctx)) + errCh := make(chan error, 1) + go func() { errCh <- reporter.Run(ctx) }() + select { case <-reporter.LeaderCh(): case <-time.After(1 * time.Second): @@ -149,6 +153,9 @@ func TestAutoUpdateVersionReporter(t *testing.T) { if diff != "" { t.Fatal(diff) } + + cancel() + require.NoError(t, <-errCh) } func TestEmitInstancesMetric(t *testing.T) { diff --git a/lib/service/service.go b/lib/service/service.go index d90e18a0ff631..b77d29f9c3497 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -2852,6 +2852,9 @@ func (process *TeleportProcess) initAuthService() error { process.RegisterFunc("auth.autoupdate_agent_rollout_controller", func() error { return trace.Wrap(agentRolloutController.Run(process.GracefulExitContext()), "running autoupdate_agent_rollout controller") }) + process.RegisterFunc("auth.autoupdate_bot_instance_version_reporter", func() error { + return trace.Wrap(authServer.BotInstanceVersionReporter.Run(process.GracefulExitContext())) + }) process.RegisterFunc("auth.server_info", func() error { return trace.Wrap(auth.ReconcileServerInfos(process.GracefulExitContext(), authServer))