Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (as *Server, err error) {
return nil, trace.Wrap(err)
}

as.botVersionReporter, err = machineidv1.NewAutoUpdateVersionReporter(machineidv1.AutoUpdateVersionReporterConfig{
as.BotInstanceVersionReporter, err = machineidv1.NewAutoUpdateVersionReporter(machineidv1.AutoUpdateVersionReporterConfig{
Clock: cfg.Clock,
Logger: as.logger.With(
teleport.ComponentKey,
Expand All @@ -828,9 +828,6 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (as *Server, err error) {
if err != nil {
return nil, trace.Wrap(err)
}
if err := as.botVersionReporter.Run(as.CloseContext()); err != nil {
return nil, trace.Wrap(err)
}

if _, ok := as.getCache(); !ok {
as.logger.WarnContext(closeCtx, "Auth server starting without cache (may have negative performance implications)")
Expand Down Expand Up @@ -1340,9 +1337,9 @@ type Server struct {
// plugin. The summarizer itself summarizes session recordings.
sessionSummarizerProvider *summarizer.SessionSummarizerProvider

// botVersionReporter is called periodically to generate a report of the
// number of bot instances by version and update group.
botVersionReporter *machineidv1.AutoUpdateVersionReporter
// BotInstanceVersionReporter is called periodically to generate a report of
// the number of bot instances by version and update group.
BotInstanceVersionReporter *machineidv1.AutoUpdateVersionReporter
}

// SetSAMLService registers svc as the SAMLService that provides the SAML
Expand Down Expand Up @@ -1858,7 +1855,7 @@ func (a *Server) runPeriodicOperations() {
case autoUpdateAgentReportKey:
go a.reportAgentVersions(a.closeCtx)
case autoUpdateBotInstanceReportKey:
go a.botVersionReporter.Report(a.closeCtx)
go a.BotInstanceVersionReporter.Report(a.closeCtx)
case autoUpdateBotInstanceMetricsKey:
go a.updateBotInstanceMetrics()
}
Expand Down
45 changes: 21 additions & 24 deletions lib/auth/machineid/machineidv1/auto_update_version_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,36 +130,33 @@ func (r *AutoUpdateVersionReporter) Run(ctx context.Context) error {
return trace.Wrap(err)
}

go func() {
defer r.logger.DebugContext(ctx, "Shutting down")
defer r.logger.DebugContext(ctx, "Shutting down")

for {
started := r.clock.Now()
r.runLeader(ctx)
leaderFor := r.clock.Now().Sub(started)
for {
started := r.clock.Now()
r.runLeader(ctx)
leaderFor := r.clock.Since(started)

// Context is done, exit immediately.
if ctx.Err() != nil {
return
}
// Context is done, exit immediately.
if ctx.Err() != nil {
return nil
}

// If we were leader for a decent amount of time, any previous
// backoff likely doesn't apply anymore.
if leaderFor > 5*time.Minute {
retry.Reset()
}
// If we were leader for a decent amount of time, any previous
// backoff likely doesn't apply anymore.
if leaderFor > 5*time.Minute {
retry.Reset()
}

// Wait for the next retry interval.
retry.Inc()
// Wait for the next retry interval.
retry.Inc()

select {
case <-retry.After():
case <-ctx.Done():
return
}
select {
case <-retry.After():
case <-ctx.Done():
return nil
}
}()
return nil
}
}

func (r *AutoUpdateVersionReporter) runLeader(ctx context.Context) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ import (
func TestAutoUpdateVersionReporter(t *testing.T) {
t.Parallel()

ctx := t.Context()
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

clock := clockwork.NewFakeClockAt(time.Now().UTC())

backend, err := memory.New(memory.Config{
Expand Down Expand Up @@ -105,7 +107,9 @@ func TestAutoUpdateVersionReporter(t *testing.T) {
require.NoError(t, err)

// Run the leader election process. Wait for the semaphore to be acquired.
require.NoError(t, reporter.Run(ctx))
errCh := make(chan error, 1)
go func() { errCh <- reporter.Run(ctx) }()

select {
case <-reporter.LeaderCh():
case <-time.After(1 * time.Second):
Expand Down Expand Up @@ -149,6 +153,9 @@ func TestAutoUpdateVersionReporter(t *testing.T) {
if diff != "" {
t.Fatal(diff)
}

cancel()
require.NoError(t, <-errCh)
}

func TestEmitInstancesMetric(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2852,6 +2852,9 @@ func (process *TeleportProcess) initAuthService() error {
process.RegisterFunc("auth.autoupdate_agent_rollout_controller", func() error {
return trace.Wrap(agentRolloutController.Run(process.GracefulExitContext()), "running autoupdate_agent_rollout controller")
})
process.RegisterFunc("auth.autoupdate_bot_instance_version_reporter", func() error {
return trace.Wrap(authServer.BotInstanceVersionReporter.Run(process.GracefulExitContext()))
})

process.RegisterFunc("auth.server_info", func() error {
return trace.Wrap(auth.ReconcileServerInfos(process.GracefulExitContext(), authServer))
Expand Down
Loading