Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/tbot/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (b *Bot) buildServices(ctx context.Context, registry *readyz.Registry) ([]*
heartbeatService, err := b.buildHeartbeatService(
identityService,
startedAt,
registry,
)
if err != nil {
return nil, closeFn, trace.Wrap(err, "building heartbeat service")
Expand Down Expand Up @@ -390,6 +391,7 @@ func (b *Bot) buildIdentityService(
func (b *Bot) buildHeartbeatService(
identityService *identity.Service,
startedAt time.Time,
statusRegistry *readyz.Registry,
) (*serviceHandle, error) {
handle := &serviceHandle{
serviceType: "internal/heartbeat",
Expand All @@ -410,6 +412,7 @@ func (b *Bot) buildHeartbeatService(
teleport.Component(teleport.ComponentTBot, handle.name),
),
StatusReporter: handle.statusReporter,
StatusRegistry: statusRegistry,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
65 changes: 57 additions & 8 deletions lib/tbot/internal/heartbeat/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -38,6 +37,16 @@ import (
"github.com/gravitational/teleport/lib/tbot/readyz"
)

const (
// Maximum amount of time we'll wait for services to report their initial
// status before sending the first heartbeat.
serviceHealthMaxWait = 30 * time.Second

// Maximum amount of time the one-shot heartbeat can take once the bot has
// started shutting down.
shutdownHeartbeatTimeout = 5 * time.Second
)

// Client for the heartbeat service.
type Client interface {
SubmitHeartbeat(
Expand Down Expand Up @@ -76,8 +85,9 @@ type Config struct {
// StatusReporter is used to report the service's health.
StatusReporter readyz.Reporter

// Clock that will be used to determine the current time.
Clock clockwork.Clock
// StatusRegistry is used to fetch the current service statuses when
// submitting a heartbeat.
StatusRegistry *readyz.Registry
}

// CheckAndSetDefaults checks the service configuration and sets any default values.
Expand All @@ -91,12 +101,11 @@ func (cfg *Config) CheckAndSetDefaults() error {
return trace.BadParameter("Client is required")
case cfg.JoinMethod == "":
return trace.BadParameter("JoinMethod is required")
}
if cfg.Clock == nil {
cfg.Clock = clockwork.NewRealClock()
case cfg.StatusRegistry == nil:
return trace.BadParameter("StatusRegistry is required")
}
if cfg.StartedAt.IsZero() {
cfg.StartedAt = cfg.Clock.Now()
cfg.StartedAt = time.Now()
}
return nil
}
Expand All @@ -114,6 +123,13 @@ type Service struct{ cfg Config }

// Run the service in long-running mode, submitting heartbeats periodically.
func (s *Service) Run(ctx context.Context) error {
// Wait for service health before sending our first heartbeat. Otherwise, we
// might report all services as "initializing" for the first ~30 minutes our
// bot is running.
if shuttingDown := s.waitForServiceHealth(ctx); shuttingDown {
return nil
}

isStartup := true
err := internal.RunOnInterval(ctx, internal.RunOnIntervalConfig{
Service: s.String(),
Expand Down Expand Up @@ -146,6 +162,21 @@ func (s *Service) Run(ctx context.Context) error {

// OneShot submits one heartbeat and then exits.
func (s *Service) OneShot(ctx context.Context) error {
// Wait for services to report their health before sending the heartbeat.
shuttingDown := s.waitForServiceHealth(ctx)

if shuttingDown {
// If the outer context has been canceled (likely because another
// service has return an error) we'll create a new one detached from
// the cancellation to try to send the heartbeat.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(
context.WithoutCancel(ctx),
shutdownHeartbeatTimeout,
)
defer cancel()
}

err := s.heartbeat(ctx, true, true)
// Ignore not implemented as this is likely confusing.
// TODO(noah): Remove NotImplemented check at V18 assuming V17 first major
Expand All @@ -159,14 +190,32 @@ func (s *Service) OneShot(ctx context.Context) error {
// String implements fmt.Stringer.
func (s *Service) String() string { return "heartbeat" }

func (s *Service) waitForServiceHealth(ctx context.Context) (shuttingDown bool) {
// We must report our own status to avoid blocking ourselves!
s.cfg.StatusReporter.Report(readyz.Healthy)

select {
case <-s.cfg.StatusRegistry.AllServicesReported():
// All services have reported their status, we're ready!
return false
case <-time.After(serviceHealthMaxWait):
// It's taking too long, give up and start sending heartbeats.
return false
case <-ctx.Done():
// The outer context has been canceled (e.g. another service has exited
// or the process has received SIGINT).
return true
}
}

func (s *Service) heartbeat(ctx context.Context, isOneShot, isStartup bool) error {
s.cfg.Logger.DebugContext(ctx, "Sending heartbeat")
hostName, err := os.Hostname()
if err != nil {
s.cfg.Logger.WarnContext(ctx, "Failed to determine hostname for heartbeat", "error", err)
}

now := s.cfg.Clock.Now()
now := time.Now()
hb := &machineidv1pb.BotInstanceStatusHeartbeat{
RecordedAt: timestamppb.New(now),
Hostname: hostName,
Expand Down
144 changes: 91 additions & 53 deletions lib/tbot/internal/heartbeat/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ import (
"os"
"runtime"
"testing"
"testing/synctest"
"time"

"github.com/google/go-cmp/cmp"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/gravitational/teleport"
machineidv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/tbot/readyz"
"github.com/gravitational/teleport/lib/utils/log/logtest"
)

Expand All @@ -59,56 +59,94 @@ func (f *fakeHeartbeatSubmitter) SubmitHeartbeat(
func TestHeartbeatService(t *testing.T) {
t.Parallel()

log := logtest.NewLogger()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

fhs := &fakeHeartbeatSubmitter{
ch: make(chan *machineidv1pb.SubmitHeartbeatRequest, 2),
}

now := time.Date(2024, time.April, 1, 12, 0, 0, 0, time.UTC)
svc, err := NewService(Config{
Interval: time.Second,
RetryLimit: 3,
Client: fhs,
Clock: clockwork.NewFakeClockAt(now),
StartedAt: time.Date(2024, time.April, 1, 11, 0, 0, 0, time.UTC),
Logger: log,
JoinMethod: types.JoinMethodGitHub,
BotKind: machineidv1pb.BotKind_BOT_KIND_TBOT,
synctest.Test(t, func(t *testing.T) {
log := logtest.NewLogger()
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

fhs := &fakeHeartbeatSubmitter{
ch: make(chan *machineidv1pb.SubmitHeartbeatRequest, 2),
}

reg := readyz.NewRegistry()
svcA := reg.AddService("a")

svc, err := NewService(Config{
Interval: time.Second,
RetryLimit: 3,
Client: fhs,
StartedAt: time.Now().Add(-1 * time.Hour),
Logger: log,
JoinMethod: types.JoinMethodGitHub,
StatusReporter: reg.AddService("heartbeat"),
StatusRegistry: reg,
BotKind: machineidv1pb.BotKind_BOT_KIND_TBOT,
})
require.NoError(t, err)

hostName, err := os.Hostname()
require.NoError(t, err)

errCh := make(chan error, 1)
go func() {
errCh <- svc.Run(ctx)
}()

synctest.Wait()
select {
case <-fhs.ch:
t.Fatal("should not have received a heartbeat until all services have reported their status")
default:
}

svcA.ReportReason(readyz.Unhealthy, "no more bananas")

want := &machineidv1pb.SubmitHeartbeatRequest{
Heartbeat: &machineidv1pb.BotInstanceStatusHeartbeat{
Hostname: hostName,
IsStartup: true,
OneShot: false,
Uptime: durationpb.New(time.Hour),
Version: teleport.Version,
Architecture: runtime.GOARCH,
Os: runtime.GOOS,
JoinMethod: string(types.JoinMethodGitHub),
Kind: machineidv1pb.BotKind_BOT_KIND_TBOT,
},
}

compare := func(t *testing.T, want, got *machineidv1pb.SubmitHeartbeatRequest) {
t.Helper()

assert.Empty(t,
cmp.Diff(want, got,
protocmp.Transform(),
protocmp.IgnoreFields(&machineidv1pb.BotInstanceStatusHeartbeat{}, "recorded_at"),
),
)
}

synctest.Wait()
select {
case got := <-fhs.ch:
compare(t, want, got)
default:
t.Fatal("no heartbeat received")
}

time.Sleep(1 * time.Second)
synctest.Wait()

select {
case got := <-fhs.ch:
want.Heartbeat.IsStartup = false
want.Heartbeat.Uptime = durationpb.New(time.Hour + time.Second)
compare(t, want, got)
default:
t.Fatal("no heartbeat received")
}

cancel()
assert.NoError(t, <-errCh)
})
require.NoError(t, err)

hostName, err := os.Hostname()
require.NoError(t, err)

errCh := make(chan error, 1)
go func() {
errCh <- svc.Run(ctx)
}()

got := <-fhs.ch
want := &machineidv1pb.SubmitHeartbeatRequest{
Heartbeat: &machineidv1pb.BotInstanceStatusHeartbeat{
RecordedAt: timestamppb.New(now),
Hostname: hostName,
IsStartup: true,
OneShot: false,
Uptime: durationpb.New(time.Hour),
Version: teleport.Version,
Architecture: runtime.GOARCH,
Os: runtime.GOOS,
JoinMethod: string(types.JoinMethodGitHub),
Kind: machineidv1pb.BotKind_BOT_KIND_TBOT,
},
}
assert.Empty(t, cmp.Diff(want, got, protocmp.Transform()))

got = <-fhs.ch
want.Heartbeat.IsStartup = false
assert.Empty(t, cmp.Diff(want, got, protocmp.Transform()))

cancel()
assert.NoError(t, <-errCh)
}
1 change: 0 additions & 1 deletion lib/tbot/internal/identity/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ func (s *Service) Initialize(ctx context.Context) error {
s.mu.Unlock()

s.unblockWaiters()
s.cfg.StatusReporter.Report(readyz.Healthy)

s.log.InfoContext(ctx, "Identity initialized successfully")
return nil
Expand Down
3 changes: 3 additions & 0 deletions lib/tbot/readyz/readyz.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type Registry struct {
// AddService adds a service to the registry so that its health will be reported
// from our readyz endpoints. It returns a Reporter the service can use to report
// status changes.
//
// Note: you should add all of your services before any service reports its status
// otherwise AllServicesReported will unblock too early.
func (r *Registry) AddService(name string) Reporter {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down
Loading