diff --git a/lib/tbot/bot/bot.go b/lib/tbot/bot/bot.go index de47d96396d59..3eaa7e9aea5ad 100644 --- a/lib/tbot/bot/bot.go +++ b/lib/tbot/bot/bot.go @@ -218,6 +218,7 @@ func (b *Bot) buildServices(ctx context.Context, registry *readyz.Registry) ([]* heartbeatService, err := b.buildHeartbeatService( identityService, startedAt, + registry, ) if err != nil { return nil, closeFn, trace.Wrap(err, "building heartbeat service") @@ -390,6 +391,7 @@ func (b *Bot) buildIdentityService( func (b *Bot) buildHeartbeatService( identityService *identity.Service, startedAt time.Time, + statusRegistry *readyz.Registry, ) (*serviceHandle, error) { handle := &serviceHandle{ serviceType: "internal/heartbeat", @@ -410,6 +412,7 @@ func (b *Bot) buildHeartbeatService( teleport.Component(teleport.ComponentTBot, handle.name), ), StatusReporter: handle.statusReporter, + StatusRegistry: statusRegistry, }) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/tbot/internal/heartbeat/service.go b/lib/tbot/internal/heartbeat/service.go index a868ca1e804fe..a6da45bc3cb9e 100644 --- a/lib/tbot/internal/heartbeat/service.go +++ b/lib/tbot/internal/heartbeat/service.go @@ -26,7 +26,6 @@ import ( "time" "github.com/gravitational/trace" - "github.com/jonboulle/clockwork" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" @@ -38,6 +37,16 @@ import ( "github.com/gravitational/teleport/lib/tbot/readyz" ) +const ( + // Maximum amount of time we'll wait for services to report their initial + // status before sending the first heartbeat. + serviceHealthMaxWait = 30 * time.Second + + // Maximum amount of time the one-shot heartbeat can take once the bot has + // started shutting down. + shutdownHeartbeatTimeout = 5 * time.Second +) + // Client for the heartbeat service. type Client interface { SubmitHeartbeat( @@ -76,8 +85,9 @@ type Config struct { // StatusReporter is used to report the service's health. StatusReporter readyz.Reporter - // Clock that will be used to determine the current time. - Clock clockwork.Clock + // StatusRegistry is used to fetch the current service statuses when + // submitting a heartbeat. + StatusRegistry *readyz.Registry } // CheckAndSetDefaults checks the service configuration and sets any default values. @@ -91,12 +101,11 @@ func (cfg *Config) CheckAndSetDefaults() error { return trace.BadParameter("Client is required") case cfg.JoinMethod == "": return trace.BadParameter("JoinMethod is required") - } - if cfg.Clock == nil { - cfg.Clock = clockwork.NewRealClock() + case cfg.StatusRegistry == nil: + return trace.BadParameter("StatusRegistry is required") } if cfg.StartedAt.IsZero() { - cfg.StartedAt = cfg.Clock.Now() + cfg.StartedAt = time.Now() } return nil } @@ -114,6 +123,13 @@ type Service struct{ cfg Config } // Run the service in long-running mode, submitting heartbeats periodically. func (s *Service) Run(ctx context.Context) error { + // Wait for service health before sending our first heartbeat. Otherwise, we + // might report all services as "initializing" for the first ~30 minutes our + // bot is running. + if shuttingDown := s.waitForServiceHealth(ctx); shuttingDown { + return nil + } + isStartup := true err := internal.RunOnInterval(ctx, internal.RunOnIntervalConfig{ Service: s.String(), @@ -146,6 +162,21 @@ func (s *Service) Run(ctx context.Context) error { // OneShot submits one heartbeat and then exits. func (s *Service) OneShot(ctx context.Context) error { + // Wait for services to report their health before sending the heartbeat. + shuttingDown := s.waitForServiceHealth(ctx) + + if shuttingDown { + // If the outer context has been canceled (likely because another + // service has return an error) we'll create a new one detached from + // the cancellation to try to send the heartbeat. + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout( + context.WithoutCancel(ctx), + shutdownHeartbeatTimeout, + ) + defer cancel() + } + err := s.heartbeat(ctx, true, true) // Ignore not implemented as this is likely confusing. // TODO(noah): Remove NotImplemented check at V18 assuming V17 first major @@ -159,6 +190,24 @@ func (s *Service) OneShot(ctx context.Context) error { // String implements fmt.Stringer. func (s *Service) String() string { return "heartbeat" } +func (s *Service) waitForServiceHealth(ctx context.Context) (shuttingDown bool) { + // We must report our own status to avoid blocking ourselves! + s.cfg.StatusReporter.Report(readyz.Healthy) + + select { + case <-s.cfg.StatusRegistry.AllServicesReported(): + // All services have reported their status, we're ready! + return false + case <-time.After(serviceHealthMaxWait): + // It's taking too long, give up and start sending heartbeats. + return false + case <-ctx.Done(): + // The outer context has been canceled (e.g. another service has exited + // or the process has received SIGINT). + return true + } +} + func (s *Service) heartbeat(ctx context.Context, isOneShot, isStartup bool) error { s.cfg.Logger.DebugContext(ctx, "Sending heartbeat") hostName, err := os.Hostname() @@ -166,7 +215,7 @@ func (s *Service) heartbeat(ctx context.Context, isOneShot, isStartup bool) erro s.cfg.Logger.WarnContext(ctx, "Failed to determine hostname for heartbeat", "error", err) } - now := s.cfg.Clock.Now() + now := time.Now() hb := &machineidv1pb.BotInstanceStatusHeartbeat{ RecordedAt: timestamppb.New(now), Hostname: hostName, diff --git a/lib/tbot/internal/heartbeat/service_test.go b/lib/tbot/internal/heartbeat/service_test.go index 9a9e772fbef26..0eb152de46121 100644 --- a/lib/tbot/internal/heartbeat/service_test.go +++ b/lib/tbot/internal/heartbeat/service_test.go @@ -23,20 +23,20 @@ import ( "os" "runtime" "testing" + "testing/synctest" "time" "github.com/google/go-cmp/cmp" - "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/gravitational/teleport" machineidv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/lib/tbot/readyz" "github.com/gravitational/teleport/lib/utils/log/logtest" ) @@ -59,56 +59,94 @@ func (f *fakeHeartbeatSubmitter) SubmitHeartbeat( func TestHeartbeatService(t *testing.T) { t.Parallel() - log := logtest.NewLogger() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - fhs := &fakeHeartbeatSubmitter{ - ch: make(chan *machineidv1pb.SubmitHeartbeatRequest, 2), - } - - now := time.Date(2024, time.April, 1, 12, 0, 0, 0, time.UTC) - svc, err := NewService(Config{ - Interval: time.Second, - RetryLimit: 3, - Client: fhs, - Clock: clockwork.NewFakeClockAt(now), - StartedAt: time.Date(2024, time.April, 1, 11, 0, 0, 0, time.UTC), - Logger: log, - JoinMethod: types.JoinMethodGitHub, - BotKind: machineidv1pb.BotKind_BOT_KIND_TBOT, + synctest.Test(t, func(t *testing.T) { + log := logtest.NewLogger() + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + fhs := &fakeHeartbeatSubmitter{ + ch: make(chan *machineidv1pb.SubmitHeartbeatRequest, 2), + } + + reg := readyz.NewRegistry() + svcA := reg.AddService("a") + + svc, err := NewService(Config{ + Interval: time.Second, + RetryLimit: 3, + Client: fhs, + StartedAt: time.Now().Add(-1 * time.Hour), + Logger: log, + JoinMethod: types.JoinMethodGitHub, + StatusReporter: reg.AddService("heartbeat"), + StatusRegistry: reg, + BotKind: machineidv1pb.BotKind_BOT_KIND_TBOT, + }) + require.NoError(t, err) + + hostName, err := os.Hostname() + require.NoError(t, err) + + errCh := make(chan error, 1) + go func() { + errCh <- svc.Run(ctx) + }() + + synctest.Wait() + select { + case <-fhs.ch: + t.Fatal("should not have received a heartbeat until all services have reported their status") + default: + } + + svcA.ReportReason(readyz.Unhealthy, "no more bananas") + + want := &machineidv1pb.SubmitHeartbeatRequest{ + Heartbeat: &machineidv1pb.BotInstanceStatusHeartbeat{ + Hostname: hostName, + IsStartup: true, + OneShot: false, + Uptime: durationpb.New(time.Hour), + Version: teleport.Version, + Architecture: runtime.GOARCH, + Os: runtime.GOOS, + JoinMethod: string(types.JoinMethodGitHub), + Kind: machineidv1pb.BotKind_BOT_KIND_TBOT, + }, + } + + compare := func(t *testing.T, want, got *machineidv1pb.SubmitHeartbeatRequest) { + t.Helper() + + assert.Empty(t, + cmp.Diff(want, got, + protocmp.Transform(), + protocmp.IgnoreFields(&machineidv1pb.BotInstanceStatusHeartbeat{}, "recorded_at"), + ), + ) + } + + synctest.Wait() + select { + case got := <-fhs.ch: + compare(t, want, got) + default: + t.Fatal("no heartbeat received") + } + + time.Sleep(1 * time.Second) + synctest.Wait() + + select { + case got := <-fhs.ch: + want.Heartbeat.IsStartup = false + want.Heartbeat.Uptime = durationpb.New(time.Hour + time.Second) + compare(t, want, got) + default: + t.Fatal("no heartbeat received") + } + + cancel() + assert.NoError(t, <-errCh) }) - require.NoError(t, err) - - hostName, err := os.Hostname() - require.NoError(t, err) - - errCh := make(chan error, 1) - go func() { - errCh <- svc.Run(ctx) - }() - - got := <-fhs.ch - want := &machineidv1pb.SubmitHeartbeatRequest{ - Heartbeat: &machineidv1pb.BotInstanceStatusHeartbeat{ - RecordedAt: timestamppb.New(now), - Hostname: hostName, - IsStartup: true, - OneShot: false, - Uptime: durationpb.New(time.Hour), - Version: teleport.Version, - Architecture: runtime.GOARCH, - Os: runtime.GOOS, - JoinMethod: string(types.JoinMethodGitHub), - Kind: machineidv1pb.BotKind_BOT_KIND_TBOT, - }, - } - assert.Empty(t, cmp.Diff(want, got, protocmp.Transform())) - - got = <-fhs.ch - want.Heartbeat.IsStartup = false - assert.Empty(t, cmp.Diff(want, got, protocmp.Transform())) - - cancel() - assert.NoError(t, <-errCh) } diff --git a/lib/tbot/internal/identity/service.go b/lib/tbot/internal/identity/service.go index dbb94354cf567..f6c4de42d08b3 100644 --- a/lib/tbot/internal/identity/service.go +++ b/lib/tbot/internal/identity/service.go @@ -372,7 +372,6 @@ func (s *Service) Initialize(ctx context.Context) error { s.mu.Unlock() s.unblockWaiters() - s.cfg.StatusReporter.Report(readyz.Healthy) s.log.InfoContext(ctx, "Identity initialized successfully") return nil diff --git a/lib/tbot/readyz/readyz.go b/lib/tbot/readyz/readyz.go index 7682c56483b0c..f275b9ce06a8b 100644 --- a/lib/tbot/readyz/readyz.go +++ b/lib/tbot/readyz/readyz.go @@ -39,6 +39,9 @@ type Registry struct { // AddService adds a service to the registry so that its health will be reported // from our readyz endpoints. It returns a Reporter the service can use to report // status changes. +// +// Note: you should add all of your services before any service reports its status +// otherwise AllServicesReported will unblock too early. func (r *Registry) AddService(name string) Reporter { r.mu.Lock() defer r.mu.Unlock()