Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,3 +551,8 @@ const (

// MaxPIVPINCacheTTL defines the maximum allowed TTL for PIV PIN client caches.
const MaxPIVPINCacheTTL = time.Hour

// AutoUpdateAgentReportPeriod is the period of the autoupdate agent reporting
// routine running in every auth server. Any report older than this period should
// be considered stale.
const AutoUpdateAgentReportPeriod = time.Minute
3 changes: 2 additions & 1 deletion lib/auth/agent_version_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/gravitational/trace"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/gravitational/teleport/api/constants"
autoupdatev1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/autoupdate/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/autoupdate"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (ir instanceReport) collectInstance(handle inventory.UpstreamHandle) {

// We skip servers that joined less than a minute ago as they might have been
// connected to another auth instance a few seconds ago, which would lead to double-counting.
if ir.timestamp.Sub(handle.RegistrationTime()) < time.Minute {
if ir.timestamp.Sub(handle.RegistrationTime()) < constants.AutoUpdateAgentReportPeriod {
return
}
// We skip control planes instances because we don't update them.
Expand Down
4 changes: 2 additions & 2 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,8 +1546,8 @@ func (a *Server) runPeriodicOperations() {
})
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: autoUpdateAgentReportKey,
Duration: time.Minute,
FirstDuration: retryutils.FullJitter(time.Minute),
Duration: constants.AutoUpdateAgentReportPeriod,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ensure that 1m is never reached as max duration? ticker delay + network + backend delays can make agents be marked as stalled

Copy link
Copy Markdown
Contributor Author

@hugoShaka hugoShaka May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ticker delay + network + backend delays can make agents be marked as stalled

Absolutely, that's a great point.

If a report is stale and we see a temporary agent drop it's not going to affect the rollout, maybe delay it by 30 seconds. The opposite, double-counting agents as they reconnect to a different auth, would be more problematic I think, as it would lead to a rollout progressing while not all agents are updated.

Maybe I'm wrong and the drops will prove to be an issue, but I'd rather end up with slow rollout than a too hasty one. If we observe issues in cloud we will fix the logic accordingly.

FirstDuration: retryutils.FullJitter(constants.AutoUpdateAgentReportPeriod),
// No jitter here, this is intentional and required for accurate tracking across auths.
})
}
Expand Down
27 changes: 25 additions & 2 deletions lib/auth/autoupdate/autoupdatev1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,25 @@ func (s *Service) DeleteAutoUpdateAgentRollout(ctx context.Context, req *autoupd
return &emptypb.Empty{}, nil
}

func (s *Service) getAllReports(ctx context.Context) ([]*autoupdate.AutoUpdateAgentReport, error) {
reports := make([]*autoupdate.AutoUpdateAgentReport, 0)

// this is an in-memory client, we go for the default page size
const pageSize = 0
var pageToken string
for {
page, nextToken, err := s.cache.ListAutoUpdateAgentReports(ctx, pageSize, pageToken)
if err != nil {
return nil, trace.Wrap(err)
}
reports = append(reports, page...)
if nextToken == "" {
return reports, nil
}
pageToken = nextToken
}
}

// TriggerAutoUpdateAgentGroup triggers automatic updates for one or many groups
// in the rollout.
func (s *Service) TriggerAutoUpdateAgentGroup(ctx context.Context, req *autoupdate.TriggerAutoUpdateAgentGroupRequest) (result *autoupdate.AutoUpdateAgentRollout, err error) {
Expand Down Expand Up @@ -702,10 +721,14 @@ func (s *Service) TriggerAutoUpdateAgentGroup(ctx context.Context, req *autoupda
for range maxTries {
existingRollout, err = s.backend.GetAutoUpdateAgentRollout(ctx)
if err != nil {
return nil, trace.Wrap(err)
return nil, trace.Wrap(err, "getting rollout")
}
reports, err := s.getAllReports(ctx)
if err != nil {
return nil, trace.Wrap(err, "getting reports")
}

err = rollout.TriggerGroups(existingRollout, rollout.GroupListToGroupSet(req.Groups), req.DesiredState, s.clock.Now())
err = rollout.TriggerGroups(existingRollout, reports, rollout.GroupListToGroupSet(req.Groups), req.DesiredState, s.clock.Now())
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
38 changes: 38 additions & 0 deletions lib/autoupdate/rollout/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ package rollout

import (
"context"
"time"

"github.com/gravitational/trace"

"github.com/gravitational/teleport/api/constants"
autoupdatepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/autoupdate/v1"
"github.com/gravitational/teleport/api/types"
)
Expand Down Expand Up @@ -52,3 +56,37 @@ type Client interface {
// so the controller can measure the rollout progress.
ListAutoUpdateAgentReports(ctx context.Context, pageSize int, nextKey string) ([]*autoupdatepb.AutoUpdateAgentReport, string, error)
}

func getAllReports(ctx context.Context, clt Client) ([]*autoupdatepb.AutoUpdateAgentReport, error) {
var reports []*autoupdatepb.AutoUpdateAgentReport

// this is an in-memory client, we go for the max page size
const pageSize = 0
var pageToken string
for {
page, nextToken, err := clt.ListAutoUpdateAgentReports(ctx, pageSize, pageToken)
if err != nil {
return nil, trace.Wrap(err)
}
reports = append(reports, page...)
if nextToken == "" {
return reports, nil
}
pageToken = nextToken
}
}

func getAllValidReports(ctx context.Context, clt Client, now time.Time) ([]*autoupdatepb.AutoUpdateAgentReport, error) {
allReports, err := getAllReports(ctx, clt)
if err != nil && !trace.IsNotFound(err) {
return nil, trace.Wrap(err, "getting all reports")
}

validReports := make([]*autoupdatepb.AutoUpdateAgentReport, len(allReports))
for _, report := range allReports {
if now.Sub(report.GetSpec().GetTimestamp().AsTime()) <= constants.AutoUpdateAgentReportPeriod {
validReports = append(validReports, report)
}
}
return validReports, nil
}
2 changes: 1 addition & 1 deletion lib/autoupdate/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewController(client Client, log *slog.Logger, clock clockwork.Clock, perio

log = log.With(teleport.ComponentLabel, teleport.ComponentRolloutController)

haltOnError, err := newHaltOnErrorStrategy(log)
haltOnError, err := newHaltOnErrorStrategy(log, client)
if err != nil {
return nil, trace.Wrap(err, "failed to initialize halt-on-error strategy")
}
Expand Down
110 changes: 102 additions & 8 deletions lib/autoupdate/rollout/strategy_haltonerror.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package rollout
import (
"context"
"log/slog"
"slices"
"time"

"github.com/gravitational/trace"
Expand All @@ -40,22 +41,34 @@ const (

type haltOnErrorStrategy struct {
log *slog.Logger
clt Client
}

func (h *haltOnErrorStrategy) name() string {
return update.AgentsStrategyHaltOnError
}

func newHaltOnErrorStrategy(log *slog.Logger) (rolloutStrategy, error) {
func newHaltOnErrorStrategy(log *slog.Logger, clt Client) (rolloutStrategy, error) {
if log == nil {
return nil, trace.BadParameter("missing log")
}
if clt == nil {
return nil, trace.BadParameter("missing Client")
}
return &haltOnErrorStrategy{
log: log.With("strategy", update.AgentsStrategyHaltOnError),
clt: clt,
}, nil
}

func (h *haltOnErrorStrategy) progressRollout(ctx context.Context, _ *autoupdate.AutoUpdateAgentRolloutSpec, status *autoupdate.AutoUpdateAgentRolloutStatus, now time.Time) error {
func (h *haltOnErrorStrategy) progressRollout(ctx context.Context, spec *autoupdate.AutoUpdateAgentRolloutSpec, status *autoupdate.AutoUpdateAgentRolloutStatus, now time.Time) error {
reports, err := getAllValidReports(ctx, h.clt, now)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}

countByGroup, upToDateByGroup := countUpToDate(reports, spec.GetTargetVersion())

// We process every group in order, all the previous groups must be in the DONE state
// for the next group to become active. Even if some early groups are not DONE,
// later groups might be ACTIVE and need to transition to DONE, so we cannot
Expand All @@ -67,6 +80,17 @@ func (h *haltOnErrorStrategy) progressRollout(ctx context.Context, _ *autoupdate
previousGroupsAreDone := true

for i, group := range status.Groups {
var agentCount, agentUpToDateCount int
if i == len(status.Groups)-1 {
agentCount, agentUpToDateCount = countCatchAll(status, countByGroup, upToDateByGroup)
} else {
agentCount = countByGroup[group.GetName()]
agentUpToDateCount = upToDateByGroup[group.GetName()]
}

group.PresentCount = uint64(agentCount)
group.UpToDateCount = uint64(agentUpToDateCount)

switch group.State {
case autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED:
var previousGroup *autoupdate.AutoUpdateAgentRolloutStatusGroup
Expand Down Expand Up @@ -105,6 +129,7 @@ func (h *haltOnErrorStrategy) progressRollout(ctx context.Context, _ *autoupdate
// All previous groups are DONE and time-related criteria are met.
// We can start.
setGroupState(group, autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, updateReasonCanStart, now)
group.InitialCount = uint64(agentCount)
}
previousGroupsAreDone = false
case autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ROLLEDBACK:
Expand All @@ -122,8 +147,8 @@ func (h *haltOnErrorStrategy) progressRollout(ctx context.Context, _ *autoupdate
setGroupState(group, autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE, reason, now)
} else {
setGroupState(group, autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, reason, now)
previousGroupsAreDone = false
}
previousGroupsAreDone = false

default:
return trace.BadParameter("unknown autoupdate group state: %v", group.State)
Expand Down Expand Up @@ -153,12 +178,81 @@ func canStartHaltOnError(group, previousGroup *autoupdate.AutoUpdateAgentRollout
return inWindow(group, now, haltOnErrorWindowDuration)
}

const (
// Currently hardcoded maxInFlight, we might add a user-facing per-group
// value in the future.
maxInFlight = 0.10
doneThreshold = 1 - maxInFlight
)

func isDoneHaltOnError(group *autoupdate.AutoUpdateAgentRolloutStatusGroup, now time.Time) (bool, string) {
// Currently we don't implement status reporting from groups/agents.
// So we just wait 60 minutes and consider the maintenance done.
// This will change as we introduce agent status report and aggregated agent counts.
if group.StartTime.AsTime().Add(haltOnErrorWindowDuration).Before(now) {
switch {
case group.InitialCount == 0:
// Currently we don't implement status reporting from groups/agents.
// So we just wait 60 minutes and consider the maintenance done.
if group.StartTime.AsTime().Add(haltOnErrorWindowDuration).Before(now) {
return true, updateReasonUpdateComplete
}
return false, updateReasonUpdateInProgress
case float64(group.PresentCount)/float64(group.InitialCount) >= doneThreshold && float64(group.UpToDateCount)/float64(group.PresentCount) >= doneThreshold:
return true, updateReasonUpdateComplete
default:
return false, updateReasonUpdateInProgress
}
return false, updateReasonUpdateInProgress
}

// countUpToDate iterates over all the reports and aggregates the counts by reported groups.
// The function returns two maps:
// - the number of agents belonging to each reported group
// - the number of up-to-date agents belonging to each reported group
func countUpToDate(
reports []*autoupdate.AutoUpdateAgentReport,
targetVersion string,
) (countByGroup, upToDateByGroup map[string]int) {
countByGroup = make(map[string]int)
upToDateByGroup = make(map[string]int)

for _, report := range reports {
for group, groupCount := range report.GetSpec().GetGroups() {
for version, versionCount := range groupCount.GetVersions() {
countByGroup[group] = countByGroup[group] + int(versionCount.GetCount())
if version == targetVersion {
upToDateByGroup[group] = upToDateByGroup[group] + int(versionCount.GetCount())
}
}
}
}
return countByGroup, upToDateByGroup
}

// CountCatchAll counts the number of agents belonging to the last group which is acting like a catch-all.
// The function returns two integers:
// - the number of agents belonging to the last group
// - the number of up-to-date agents belonging to the last group
func countCatchAll(rolloutStatus *autoupdate.AutoUpdateAgentRolloutStatus, countByGroup, upToDateByGroup map[string]int) (int, int) {
if len(rolloutStatus.GetGroups()) == 0 {
return 0, 0
}

rolloutGroups := make([]string, 0, len(rolloutStatus.GetGroups()))
// We don't count the last group as it is the default one
for _, group := range rolloutStatus.GetGroups()[:len(rolloutStatus.GetGroups())-1] {
rolloutGroups = append(rolloutGroups, group.GetName())
}

var defaultGroupCount, upToDateDefaultGroupCount int

for group, count := range countByGroup {
if !slices.Contains(rolloutGroups, group) {
defaultGroupCount += count
}
}

for group, count := range upToDateByGroup {
if !slices.Contains(rolloutGroups, group) {
upToDateDefaultGroupCount += count
}
}

return defaultGroupCount, upToDateDefaultGroupCount
}
Loading
Loading