From 480897d3312f91b7aba266803eb080929ec4e761 Mon Sep 17 00:00:00 2001 From: hugoShaka Date: Fri, 23 May 2025 16:47:43 -0400 Subject: [PATCH 1/3] Make half-on-error autoupdate strategy use agent reports --- lib/auth/autoupdate/autoupdatev1/service.go | 27 +- lib/autoupdate/rollout/controller.go | 2 +- .../rollout/strategy_haltonerror.go | 145 +++++++++- .../rollout/strategy_haltonerror_test.go | 248 +++++++++++++++++- lib/autoupdate/rollout/transitions.go | 23 +- lib/autoupdate/rollout/transitions_test.go | 124 ++++++++- 6 files changed, 543 insertions(+), 26 deletions(-) diff --git a/lib/auth/autoupdate/autoupdatev1/service.go b/lib/auth/autoupdate/autoupdatev1/service.go index 4a111fe262cb5..6bad36a12f68f 100644 --- a/lib/auth/autoupdate/autoupdatev1/service.go +++ b/lib/auth/autoupdate/autoupdatev1/service.go @@ -659,6 +659,25 @@ func (s *Service) DeleteAutoUpdateAgentRollout(ctx context.Context, req *autoupd return &emptypb.Empty{}, nil } +func (s *Service) getAllReports(ctx context.Context) ([]*autoupdate.AutoUpdateAgentReport, error) { + reports := make([]*autoupdate.AutoUpdateAgentReport, 0) + + // this is an in-memory client, we go for the max page size + const pageSize = 0 + var pageToken string + for { + page, nextToken, err := s.cache.ListAutoUpdateAgentReports(ctx, pageSize, pageToken) + if err != nil { + return nil, trace.Wrap(err) + } + reports = append(reports, page...) + if nextToken == "" { + return reports, nil + } + pageToken = nextToken + } +} + // TriggerAutoUpdateAgentGroup triggers automatic updates for one or many groups // in the rollout. func (s *Service) TriggerAutoUpdateAgentGroup(ctx context.Context, req *autoupdate.TriggerAutoUpdateAgentGroupRequest) (result *autoupdate.AutoUpdateAgentRollout, err error) { @@ -702,10 +721,14 @@ func (s *Service) TriggerAutoUpdateAgentGroup(ctx context.Context, req *autoupda for range maxTries { existingRollout, err = s.backend.GetAutoUpdateAgentRollout(ctx) if err != nil { - return nil, trace.Wrap(err) + return nil, trace.Wrap(err, "getting rollout") + } + reports, err := s.getAllReports(ctx) + if err != nil { + return nil, trace.Wrap(err, "getting reports") } - err = rollout.TriggerGroups(existingRollout, rollout.GroupListToGroupSet(req.Groups), req.DesiredState, s.clock.Now()) + err = rollout.TriggerGroups(existingRollout, reports, rollout.GroupListToGroupSet(req.Groups), req.DesiredState, s.clock.Now()) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/autoupdate/rollout/controller.go b/lib/autoupdate/rollout/controller.go index 09aa61c442f4d..477fb1e48121f 100644 --- a/lib/autoupdate/rollout/controller.go +++ b/lib/autoupdate/rollout/controller.go @@ -73,7 +73,7 @@ func NewController(client Client, log *slog.Logger, clock clockwork.Clock, perio log = log.With(teleport.ComponentLabel, teleport.ComponentRolloutController) - haltOnError, err := newHaltOnErrorStrategy(log) + haltOnError, err := newHaltOnErrorStrategy(log, client) if err != nil { return nil, trace.Wrap(err, "failed to initialize halt-on-error strategy") } diff --git a/lib/autoupdate/rollout/strategy_haltonerror.go b/lib/autoupdate/rollout/strategy_haltonerror.go index fafc5d5ae30d3..6e0df8957f7a9 100644 --- a/lib/autoupdate/rollout/strategy_haltonerror.go +++ b/lib/autoupdate/rollout/strategy_haltonerror.go @@ -21,6 +21,7 @@ package rollout import ( "context" "log/slog" + "slices" "time" "github.com/gravitational/trace" @@ -40,22 +41,69 @@ const ( type haltOnErrorStrategy struct { log *slog.Logger + clt Client } func (h *haltOnErrorStrategy) name() string { return update.AgentsStrategyHaltOnError } -func newHaltOnErrorStrategy(log *slog.Logger) (rolloutStrategy, error) { +func newHaltOnErrorStrategy(log *slog.Logger, clt Client) (rolloutStrategy, error) { if log == nil { return nil, trace.BadParameter("missing log") } + if clt == nil { + return nil, trace.BadParameter("missing Client") + } return &haltOnErrorStrategy{ log: log.With("strategy", update.AgentsStrategyHaltOnError), + clt: clt, }, nil } -func (h *haltOnErrorStrategy) progressRollout(ctx context.Context, _ *autoupdate.AutoUpdateAgentRolloutSpec, status *autoupdate.AutoUpdateAgentRolloutStatus, now time.Time) error { +func (h *haltOnErrorStrategy) getAllReports(ctx context.Context) ([]*autoupdate.AutoUpdateAgentReport, error) { + reports := make([]*autoupdate.AutoUpdateAgentReport, 0) + + // this is an in-memory client, we go for the max page size + const pageSize = 0 + var pageToken string + for { + page, nextToken, err := h.clt.ListAutoUpdateAgentReports(ctx, pageSize, pageToken) + if err != nil { + return nil, trace.Wrap(err) + } + reports = append(reports, page...) + if nextToken == "" { + return reports, nil + } + pageToken = nextToken + } +} + +func (h *haltOnErrorStrategy) getAllValidReports(ctx context.Context, now time.Time) ([]*autoupdate.AutoUpdateAgentReport, error) { + allReports, err := h.getAllReports(ctx) + if err != nil && !trace.IsNotFound(err) { + return nil, trace.Wrap(err, "getting all reports") + } + + validReports := make([]*autoupdate.AutoUpdateAgentReport, len(allReports)) + for _, report := range allReports { + // TODO replace time.minute by the auth periodic operation constant + if now.Sub(report.GetSpec().GetTimestamp().AsTime()) <= time.Minute { + validReports = append(validReports, report) + } + } + return validReports, nil +} + +func (h *haltOnErrorStrategy) progressRollout(ctx context.Context, spec *autoupdate.AutoUpdateAgentRolloutSpec, status *autoupdate.AutoUpdateAgentRolloutStatus, now time.Time) error { + reports, err := h.getAllValidReports(ctx, now) + if err != nil && !trace.IsNotFound(err) { + return trace.Wrap(err) + } + + countByGroup, upToDateByGroup := countUpToDate(reports, spec.GetTargetVersion()) + // We process every group in order, all the previous groups must be in the DONE state // for the next group to become active. Even if some early groups are not DONE, // later groups might be ACTIVE and need to transition to DONE, so we cannot @@ -67,6 +115,17 @@ func (h *haltOnErrorStrategy) progressRollout(ctx context.Context, _ *autoupdate previousGroupsAreDone := true for i, group := range status.Groups { + var agentCount, agentUpToDateCount int + if i == len(status.Groups)-1 { + agentCount, agentUpToDateCount = countCatchAll(status, countByGroup, upToDateByGroup) + } else { + agentCount = countByGroup[group.GetName()] + agentUpToDateCount = upToDateByGroup[group.GetName()] + } + + group.PresentCount = uint64(agentCount) + group.UpToDateCount = uint64(agentUpToDateCount) + switch group.State { case autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED: var previousGroup *autoupdate.AutoUpdateAgentRolloutStatusGroup @@ -105,6 +164,7 @@ func (h *haltOnErrorStrategy) progressRollout(ctx context.Context, _ *autoupdate // All previous groups are DONE and time-related criteria are met. // We can start. setGroupState(group, autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, updateReasonCanStart, now) + group.InitialCount = uint64(agentCount) } previousGroupsAreDone = false case autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ROLLEDBACK: @@ -122,8 +182,8 @@ func (h *haltOnErrorStrategy) progressRollout(ctx context.Context, _ *autoupdate setGroupState(group, autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE, reason, now) } else { setGroupState(group, autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, reason, now) + previousGroupsAreDone = false } - previousGroupsAreDone = false default: return trace.BadParameter("unknown autoupdate group state: %v", group.State) @@ -153,12 +213,81 @@ func canStartHaltOnError(group, previousGroup *autoupdate.AutoUpdateAgentRollout return inWindow(group, now, haltOnErrorWindowDuration) } +const ( + // Currently hardcoded maxInFlight, we might add a user-facing per-group + // value in the future. + maxInFlight = 0.10 + doneThreshold = 1 - maxInFlight +) + func isDoneHaltOnError(group *autoupdate.AutoUpdateAgentRolloutStatusGroup, now time.Time) (bool, string) { - // Currently we don't implement status reporting from groups/agents. - // So we just wait 60 minutes and consider the maintenance done. - // This will change as we introduce agent status report and aggregated agent counts. - if group.StartTime.AsTime().Add(haltOnErrorWindowDuration).Before(now) { + switch { + case group.InitialCount == 0: + // Currently we don't implement status reporting from groups/agents. + // So we just wait 60 minutes and consider the maintenance done. + if group.StartTime.AsTime().Add(haltOnErrorWindowDuration).Before(now) { + return true, updateReasonUpdateComplete + } + return false, updateReasonUpdateInProgress + case float64(group.PresentCount)/float64(group.InitialCount) >= doneThreshold && float64(group.UpToDateCount)/float64(group.PresentCount) >= doneThreshold: return true, updateReasonUpdateComplete + default: + return false, updateReasonUpdateInProgress + } +} + +// countUpToDate iterates over all the reports and aggregates the counts by reported groups. +// The function returns two maps: +// - the number of agents belonging to each reported group +// - the number of up-to-date agents belonging to each reported group +func countUpToDate( + reports []*autoupdate.AutoUpdateAgentReport, + targetVersion string, +) (countByGroup, upToDateByGroup map[string]int) { + countByGroup = make(map[string]int) + upToDateByGroup = make(map[string]int) + + for _, report := range reports { + for group, groupCount := range report.GetSpec().GetGroups() { + for version, versionCount := range groupCount.GetVersions() { + countByGroup[group] = countByGroup[group] + int(versionCount.GetCount()) + if version == targetVersion { + upToDateByGroup[group] = upToDateByGroup[group] + int(versionCount.GetCount()) + } + } + } + } + return countByGroup, upToDateByGroup +} + +// CountCatchAll counts the number of agents belonging to the last group which is acting like a catch-all. +// The function returns two integers: +// - the number of agents belonging to the last group +// - the number of up-to-date agents belonging to the last group +func countCatchAll(rolloutStatus *autoupdate.AutoUpdateAgentRolloutStatus, countByGroup, upToDateByGroup map[string]int) (int, int) { + if len(rolloutStatus.GetGroups()) == 0 { + return 0, 0 } - return false, updateReasonUpdateInProgress + + rolloutGroups := make([]string, 0, len(rolloutStatus.GetGroups())) + // We don't count the last group as it is the default one + for _, group := range rolloutStatus.GetGroups()[:len(rolloutStatus.GetGroups())-1] { + rolloutGroups = append(rolloutGroups, group.GetName()) + } + + var defaultGroupCount, upToDateDefaultGroupCount int + + for group, count := range countByGroup { + if !slices.Contains(rolloutGroups, group) { + defaultGroupCount += count + } + } + + for group, count := range upToDateByGroup { + if !slices.Contains(rolloutGroups, group) { + upToDateDefaultGroupCount += count + } + } + + return defaultGroupCount, upToDateDefaultGroupCount } diff --git a/lib/autoupdate/rollout/strategy_haltonerror_test.go b/lib/autoupdate/rollout/strategy_haltonerror_test.go index 2f59534ddd7db..b5de3279a4847 100644 --- a/lib/autoupdate/rollout/strategy_haltonerror_test.go +++ b/lib/autoupdate/rollout/strategy_haltonerror_test.go @@ -23,11 +23,13 @@ import ( "testing" "time" + "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/timestamppb" "github.com/gravitational/teleport/api/gen/proto/go/teleport/autoupdate/v1" + headerv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1" "github.com/gravitational/teleport/lib/utils" ) @@ -134,22 +136,72 @@ func Test_canStartHaltOnError(t *testing.T) { func Test_progressGroupsHaltOnError(t *testing.T) { clock := clockwork.NewFakeClockAt(testSunday) log := utils.NewSlogLoggerForTests() - strategy, err := newHaltOnErrorStrategy(log) - require.NoError(t, err) + fewSecondsAgo := clock.Now().Add(-3 * time.Second) fewMinutesAgo := clock.Now().Add(-5 * time.Minute) yesterday := testSaturday canStartToday := everyWeekday cannotStartToday := everyWeekdayButSunday ctx := context.Background() + startVersion := "1.2.3" + targetVersion := "1.2.4" + otherVersion := "1.2.5" + group1Name := "group1" group2Name := "group2" group3Name := "group3" + testReports := []*autoupdate.AutoUpdateAgentReport{ + { + Metadata: &headerv1.Metadata{Name: "auth1"}, + Spec: &autoupdate.AutoUpdateAgentReportSpec{ + Timestamp: timestamppb.New(fewSecondsAgo), + Groups: map[string]*autoupdate.AutoUpdateAgentReportSpecGroup{ + group1Name: { + Versions: map[string]*autoupdate.AutoUpdateAgentReportSpecGroupVersion{ + startVersion: {Count: 4}, + targetVersion: {Count: 5}, + otherVersion: {Count: 1}, + }, + }, + group2Name: { + Versions: map[string]*autoupdate.AutoUpdateAgentReportSpecGroupVersion{ + startVersion: {Count: 5}, + targetVersion: {Count: 5}, + }, + }, + }, + }, + }, + { + // This report is expired, it must be ignored + Metadata: &headerv1.Metadata{Name: "auth2"}, + Spec: &autoupdate.AutoUpdateAgentReportSpec{ + Timestamp: timestamppb.New(fewMinutesAgo), + Groups: map[string]*autoupdate.AutoUpdateAgentReportSpecGroup{ + group1Name: { + Versions: map[string]*autoupdate.AutoUpdateAgentReportSpecGroupVersion{ + startVersion: {Count: 123}, + targetVersion: {Count: 123}, + otherVersion: {Count: 123}, + }, + }, + group2Name: { + Versions: map[string]*autoupdate.AutoUpdateAgentReportSpecGroupVersion{ + startVersion: {Count: 123}, + targetVersion: {Count: 123}, + }, + }, + }, + }, + }, + } + tests := []struct { name string initialState []*autoupdate.AutoUpdateAgentRolloutStatusGroup + reports []*autoupdate.AutoUpdateAgentReport rolloutStartTime *timestamppb.Timestamp expectedState []*autoupdate.AutoUpdateAgentRolloutStatusGroup }{ @@ -492,6 +544,172 @@ func Test_progressGroupsHaltOnError(t *testing.T) { }, }, }, + { + name: "single group unstarted -> unstarted with reports", + initialState: []*autoupdate.AutoUpdateAgentRolloutStatusGroup{ + { + Name: group1Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED, + LastUpdateTime: timestamppb.New(yesterday), + LastUpdateReason: updateReasonCreated, + ConfigDays: cannotStartToday, + ConfigStartHour: matchingStartHour, + }, + }, + reports: testReports, + expectedState: []*autoupdate.AutoUpdateAgentRolloutStatusGroup{ + { + Name: group1Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED, + LastUpdateTime: timestamppb.New(clock.Now()), + LastUpdateReason: updateReasonCannotStart, + ConfigDays: cannotStartToday, + ConfigStartHour: matchingStartHour, + // Group1 is the catch-all group, so it should count group2 agents + PresentCount: 20, + UpToDateCount: 10, + }, + }, + }, + { + name: "single group active -> active with reports", + initialState: []*autoupdate.AutoUpdateAgentRolloutStatusGroup{ + { + Name: group1Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, + StartTime: timestamppb.New(fewMinutesAgo), + LastUpdateTime: timestamppb.New(fewMinutesAgo), + LastUpdateReason: updateReasonCanStart, + ConfigDays: canStartToday, + ConfigStartHour: matchingStartHour, + InitialCount: 25, + UpToDateCount: 0, + PresentCount: 10, + }, + }, + reports: testReports, + expectedState: []*autoupdate.AutoUpdateAgentRolloutStatusGroup{ + { + Name: group1Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, + StartTime: timestamppb.New(fewMinutesAgo), + LastUpdateTime: timestamppb.New(clock.Now()), + LastUpdateReason: updateReasonUpdateInProgress, + ConfigDays: canStartToday, + ConfigStartHour: matchingStartHour, + // Group1 is the catch-all group, so it should count group2 agents + PresentCount: 20, + UpToDateCount: 10, + // InitialCount must not be changed during active -> active transitions + InitialCount: 25, + }, + }, + }, + { + name: "single group unstarted -> active", + initialState: []*autoupdate.AutoUpdateAgentRolloutStatusGroup{ + { + Name: group1Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED, + LastUpdateTime: timestamppb.New(yesterday), + LastUpdateReason: updateReasonCreated, + ConfigDays: canStartToday, + ConfigStartHour: matchingStartHour, + PresentCount: 12, + UpToDateCount: 3, + }, + }, + reports: testReports, + expectedState: []*autoupdate.AutoUpdateAgentRolloutStatusGroup{ + { + Name: group1Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, + StartTime: timestamppb.New(clock.Now()), + LastUpdateTime: timestamppb.New(clock.Now()), + LastUpdateReason: updateReasonCanStart, + ConfigDays: canStartToday, + ConfigStartHour: matchingStartHour, + // InitialCount must be set during unstarted -> active transition + InitialCount: 20, + PresentCount: 20, + UpToDateCount: 10, + }, + }, + }, + { + name: "first group done, second should activate, third should not progress, with reports", + initialState: []*autoupdate.AutoUpdateAgentRolloutStatusGroup{ + { + Name: group1Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE, + StartTime: timestamppb.New(yesterday), + LastUpdateTime: timestamppb.New(yesterday), + LastUpdateReason: updateReasonUpdateComplete, + ConfigDays: canStartToday, + ConfigStartHour: matchingStartHour, + InitialCount: 10, + PresentCount: 8, + UpToDateCount: 5, + }, + { + Name: group2Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED, + LastUpdateTime: timestamppb.New(yesterday), + LastUpdateReason: updateReasonCreated, + ConfigDays: canStartToday, + ConfigStartHour: matchingStartHour, + ConfigWaitHours: 24, + PresentCount: 2, + UpToDateCount: 2, + }, + { + Name: group3Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED, + LastUpdateTime: timestamppb.New(yesterday), + LastUpdateReason: updateReasonCreated, + ConfigDays: canStartToday, + ConfigStartHour: matchingStartHour, + ConfigWaitHours: 0, + }, + }, + reports: testReports, + expectedState: []*autoupdate.AutoUpdateAgentRolloutStatusGroup{ + { + Name: group1Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE, + StartTime: timestamppb.New(yesterday), + LastUpdateTime: timestamppb.New(yesterday), + LastUpdateReason: updateReasonUpdateComplete, + ConfigDays: canStartToday, + ConfigStartHour: matchingStartHour, + InitialCount: 10, + PresentCount: 10, + UpToDateCount: 5, + }, + { + Name: group2Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, + StartTime: timestamppb.New(clock.Now()), + LastUpdateTime: timestamppb.New(clock.Now()), + LastUpdateReason: updateReasonCanStart, + ConfigDays: canStartToday, + ConfigStartHour: matchingStartHour, + ConfigWaitHours: 24, + InitialCount: 10, + PresentCount: 10, + UpToDateCount: 5, + }, + { + Name: group3Name, + State: autoupdate.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED, + LastUpdateTime: timestamppb.New(clock.Now()), + LastUpdateReason: updateReasonPreviousGroupsNotDone, + ConfigDays: canStartToday, + ConfigStartHour: matchingStartHour, + ConfigWaitHours: 0, + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -500,7 +718,31 @@ func Test_progressGroupsHaltOnError(t *testing.T) { State: 0, StartTime: tt.rolloutStartTime, } - err := strategy.progressRollout(ctx, nil, status, clock.Now()) + spec := &autoupdate.AutoUpdateAgentRolloutSpec{ + StartVersion: startVersion, + TargetVersion: targetVersion, + } + + stubs := mockClientStubs{} + if tt.reports == nil { + stubs.reportsAnswers = []callAnswer[[]*autoupdate.AutoUpdateAgentReport]{ + { + result: []*autoupdate.AutoUpdateAgentReport{}, + err: trace.NotFound("no report"), + }, + } + } else { + stubs.reportsAnswers = []callAnswer[[]*autoupdate.AutoUpdateAgentReport]{ + { + result: tt.reports, + err: nil, + }, + } + } + clt := newMockClient(t, stubs) + strategy, err := newHaltOnErrorStrategy(log, clt) + require.NoError(t, err) + err = strategy.progressRollout(ctx, spec, status, clock.Now()) require.NoError(t, err) // We use require.Equal instead of Elements match because group order matters. // It's not super important for time-based, but is crucial for halt-on-error. diff --git a/lib/autoupdate/rollout/transitions.go b/lib/autoupdate/rollout/transitions.go index a746efbcbf74b..9bbdbde19cb99 100644 --- a/lib/autoupdate/rollout/transitions.go +++ b/lib/autoupdate/rollout/transitions.go @@ -49,7 +49,7 @@ func GroupListToGroupSet(groupList []string) GroupSet { // to commit it in the backend. // The function takes a desired State parameter to leave room for future canary // state support as specified in RFD 184. -func TriggerGroups(rollout *autoupdatev1pb.AutoUpdateAgentRollout, groupsToTrigger GroupSet, desiredState autoupdatev1pb.AutoUpdateAgentGroupState, now time.Time) error { +func TriggerGroups(rollout *autoupdatev1pb.AutoUpdateAgentRollout, reports []*autoupdatev1pb.AutoUpdateAgentReport, groupsToTrigger GroupSet, desiredState autoupdatev1pb.AutoUpdateAgentGroupState, now time.Time) error { // Validation part, we look for everything not in order or unsupported. if rollout == nil { return trace.BadParameter("rollout cannot be nil") @@ -72,6 +72,17 @@ func TriggerGroups(rollout *autoupdatev1pb.AutoUpdateAgentRollout, groupsToTrigg return trace.BadParameter("unsupported desired state: %s, supported states are 'unspecified' and 'active'", desiredState) } + // filter out expired reports + validReports := make([]*autoupdatev1pb.AutoUpdateAgentReport, len(reports)) + for _, report := range reports { + // TODO replace time.minute by the auth periodic operation constant + if now.Sub(report.GetSpec().GetTimestamp().AsTime()) <= time.Minute { + validReports = append(validReports, report) + } + } + + countByGroup, upToDateByGroup := countUpToDate(validReports, rollout.GetSpec().GetTargetVersion()) + groups := rollout.GetStatus().GetGroups() if len(groups) == 0 { return trace.BadParameter("rollout has no groups") @@ -97,7 +108,17 @@ func TriggerGroups(rollout *autoupdatev1pb.AutoUpdateAgentRollout, groupsToTrigg return trace.BadParameter("group %q in unexpected state %s", groupName, group.GetState()) } + var initialCount, upToDateCount int setGroupState(group, desiredState, updateReasonManualTrigger, now) + if groupName == groups[len(groups)-1].GetName() { + initialCount, upToDateCount = countCatchAll(rollout.GetStatus(), countByGroup, upToDateByGroup) + } else { + initialCount = countByGroup[groupName] + upToDateCount = upToDateByGroup[groupName] + } + group.UpToDateCount = uint64(upToDateCount) + group.InitialCount = uint64(initialCount) + group.PresentCount = uint64(initialCount) } return nil diff --git a/lib/autoupdate/rollout/transitions_test.go b/lib/autoupdate/rollout/transitions_test.go index 260beff1c0ba5..9ee1e6464292c 100644 --- a/lib/autoupdate/rollout/transitions_test.go +++ b/lib/autoupdate/rollout/transitions_test.go @@ -24,19 +24,27 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/timestamppb" autoupdatev1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/autoupdate/v1" + headerv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1" "github.com/gravitational/teleport/api/types/autoupdate" ) func TestTriggerGroups(t *testing.T) { now := time.Now() nowPb := timestamppb.New(now) + fewSecondsAgo := now.Add(-3 * time.Second) + fewMinutesAgo := now.Add(-6 * time.Minute) + startVersion := "1.2.3" + targetVersion := "1.2.4" + otherVersion := "1.2.5" + spec := &autoupdatev1pb.AutoUpdateAgentRolloutSpec{ - StartVersion: "1.2.3", - TargetVersion: "1.2.4", + StartVersion: startVersion, + TargetVersion: targetVersion, Schedule: autoupdate.AgentsScheduleRegular, AutoupdateMode: autoupdate.AgentsUpdateModeEnabled, Strategy: autoupdate.AgentsStrategyHaltOnError, @@ -65,12 +73,58 @@ func TestTriggerGroups(t *testing.T) { }, }, } + testReports := []*autoupdatev1pb.AutoUpdateAgentReport{ + { + Metadata: &headerv1.Metadata{Name: "auth1"}, + Spec: &autoupdatev1pb.AutoUpdateAgentReportSpec{ + Timestamp: timestamppb.New(fewSecondsAgo), + Groups: map[string]*autoupdatev1pb.AutoUpdateAgentReportSpecGroup{ + "blue": { + Versions: map[string]*autoupdatev1pb.AutoUpdateAgentReportSpecGroupVersion{ + startVersion: {Count: 4}, + targetVersion: {Count: 5}, + otherVersion: {Count: 1}, + }, + }, + "dev": { + Versions: map[string]*autoupdatev1pb.AutoUpdateAgentReportSpecGroupVersion{ + startVersion: {Count: 5}, + targetVersion: {Count: 5}, + }, + }, + }, + }, + }, + { + // This report is expired, it must be ignored + Metadata: &headerv1.Metadata{Name: "auth2"}, + Spec: &autoupdatev1pb.AutoUpdateAgentReportSpec{ + Timestamp: timestamppb.New(fewMinutesAgo), + Groups: map[string]*autoupdatev1pb.AutoUpdateAgentReportSpecGroup{ + "blue": { + Versions: map[string]*autoupdatev1pb.AutoUpdateAgentReportSpecGroupVersion{ + startVersion: {Count: 123}, + targetVersion: {Count: 123}, + otherVersion: {Count: 123}, + }, + }, + "stage": { + Versions: map[string]*autoupdatev1pb.AutoUpdateAgentReportSpecGroupVersion{ + startVersion: {Count: 123}, + targetVersion: {Count: 123}, + }, + }, + }, + }, + }, + } tests := []struct { name string rollout *autoupdatev1pb.AutoUpdateAgentRollout groupNames []string desiredState autoupdatev1pb.AutoUpdateAgentGroupState + reports []*autoupdatev1pb.AutoUpdateAgentReport expectedStatus *autoupdatev1pb.AutoUpdateAgentRolloutStatus expectErr require.ErrorAssertionFunc }{ @@ -78,7 +132,7 @@ func TestTriggerGroups(t *testing.T) { name: "valid transition", rollout: &autoupdatev1pb.AutoUpdateAgentRollout{ Spec: spec, - Status: status, + Status: proto.Clone(status).(*autoupdatev1pb.AutoUpdateAgentRolloutStatus), }, groupNames: []string{"blue", "prod", "backup"}, desiredState: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, @@ -117,6 +171,54 @@ func TestTriggerGroups(t *testing.T) { }, }, }, + { + name: "valid transition, with reports", + rollout: &autoupdatev1pb.AutoUpdateAgentRollout{ + Spec: spec, + Status: proto.Clone(status).(*autoupdatev1pb.AutoUpdateAgentRolloutStatus), + }, + groupNames: []string{"blue", "prod", "backup"}, + desiredState: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, + reports: testReports, + expectErr: require.NoError, + expectedStatus: &autoupdatev1pb.AutoUpdateAgentRolloutStatus{ + Groups: []*autoupdatev1pb.AutoUpdateAgentRolloutStatusGroup{ + { + Name: "blue", + State: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, + StartTime: nowPb, + LastUpdateTime: nowPb, + LastUpdateReason: updateReasonManualTrigger, + // The group transitioned, the count must be set + InitialCount: 10, + PresentCount: 10, + UpToDateCount: 5, + }, + { + Name: "dev", + State: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE, + }, + { + Name: "stage", + State: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, + }, + { + Name: "prod", + State: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, + StartTime: nowPb, + LastUpdateTime: nowPb, + LastUpdateReason: updateReasonManualTrigger, + }, + { + Name: "backup", + State: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, + StartTime: nowPb, + LastUpdateTime: nowPb, + LastUpdateReason: updateReasonManualTrigger, + }, + }, + }, + }, { name: "no groups in rollout", rollout: &autoupdatev1pb.AutoUpdateAgentRollout{ @@ -133,7 +235,7 @@ func TestTriggerGroups(t *testing.T) { name: "unsupported desired state", rollout: &autoupdatev1pb.AutoUpdateAgentRollout{ Spec: spec, - Status: status, + Status: proto.Clone(status).(*autoupdatev1pb.AutoUpdateAgentRolloutStatus), }, groupNames: []string{"prod", "backup"}, desiredState: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ROLLEDBACK, @@ -145,13 +247,13 @@ func TestTriggerGroups(t *testing.T) { name: "unsupported strategy", rollout: &autoupdatev1pb.AutoUpdateAgentRollout{ Spec: &autoupdatev1pb.AutoUpdateAgentRolloutSpec{ - StartVersion: "1.2.3", - TargetVersion: "1.2.4", + StartVersion: startVersion, + TargetVersion: targetVersion, Schedule: autoupdate.AgentsScheduleRegular, AutoupdateMode: autoupdate.AgentsUpdateModeEnabled, Strategy: autoupdate.AgentsStrategyTimeBased, }, - Status: status, + Status: proto.Clone(status).(*autoupdatev1pb.AutoUpdateAgentRolloutStatus), }, groupNames: []string{"prod", "backup"}, desiredState: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, @@ -163,12 +265,12 @@ func TestTriggerGroups(t *testing.T) { name: "unsupported schedule", rollout: &autoupdatev1pb.AutoUpdateAgentRollout{ Spec: &autoupdatev1pb.AutoUpdateAgentRolloutSpec{ - StartVersion: "1.2.3", - TargetVersion: "1.2.4", + StartVersion: startVersion, + TargetVersion: targetVersion, Schedule: autoupdate.AgentsScheduleImmediate, AutoupdateMode: autoupdate.AgentsUpdateModeEnabled, }, - Status: nil, + Status: proto.Clone(status).(*autoupdatev1pb.AutoUpdateAgentRolloutStatus), }, groupNames: []string{"prod", "backup"}, desiredState: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, @@ -203,7 +305,7 @@ func TestTriggerGroups(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := TriggerGroups(tt.rollout, GroupListToGroupSet(tt.groupNames), tt.desiredState, now) + err := TriggerGroups(tt.rollout, tt.reports, GroupListToGroupSet(tt.groupNames), tt.desiredState, now) tt.expectErr(t, err) if err == nil { From 42cdfff64b8e58f6f4c50b5fbff5a880d165e7d1 Mon Sep 17 00:00:00 2001 From: hugoShaka Date: Mon, 26 May 2025 14:15:17 -0400 Subject: [PATCH 2/3] Make report helpers reusable for time-based strategy --- api/constants/constants.go | 5 +++ lib/auth/agent_version_report.go | 3 +- lib/auth/auth.go | 4 +- lib/autoupdate/rollout/client.go | 38 +++++++++++++++++++ .../rollout/strategy_haltonerror.go | 37 +----------------- 5 files changed, 48 insertions(+), 39 deletions(-) diff --git a/api/constants/constants.go b/api/constants/constants.go index db619966eaae6..918af99028fd2 100644 --- a/api/constants/constants.go +++ b/api/constants/constants.go @@ -551,3 +551,8 @@ const ( // MaxPIVPINCacheTTL defines the maximum allowed TTL for PIV PIN client caches. const MaxPIVPINCacheTTL = time.Hour + +// AutoUpdateAgentReportPeriod is the period of the autoupdate agent reporting +// routine running in every auth server. Any report older than this period should +// be considered stale. +const AutoUpdateAgentReportPeriod = time.Minute diff --git a/lib/auth/agent_version_report.go b/lib/auth/agent_version_report.go index 3a7488fe70118..6dc3c4fc7f7af 100644 --- a/lib/auth/agent_version_report.go +++ b/lib/auth/agent_version_report.go @@ -25,6 +25,7 @@ import ( "github.com/gravitational/trace" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/gravitational/teleport/api/constants" autoupdatev1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/autoupdate/v1" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/autoupdate" @@ -55,7 +56,7 @@ func (ir instanceReport) collectInstance(handle inventory.UpstreamHandle) { // We skip servers that joined less than a minute ago as they might have been // connected to another auth instance a few seconds ago, which would lead to double-counting. - if ir.timestamp.Sub(handle.RegistrationTime()) < time.Minute { + if ir.timestamp.Sub(handle.RegistrationTime()) < constants.AutoUpdateAgentReportPeriod { return } // We skip control planes instances because we don't update them. diff --git a/lib/auth/auth.go b/lib/auth/auth.go index e19c617c5e63b..cf689942be48a 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -1546,8 +1546,8 @@ func (a *Server) runPeriodicOperations() { }) ticker.Push(interval.SubInterval[periodicIntervalKey]{ Key: autoUpdateAgentReportKey, - Duration: time.Minute, - FirstDuration: retryutils.FullJitter(time.Minute), + Duration: constants.AutoUpdateAgentReportPeriod, + FirstDuration: retryutils.FullJitter(constants.AutoUpdateAgentReportPeriod), // No jitter here, this is intentional and required for accurate tracking across auths. }) } diff --git a/lib/autoupdate/rollout/client.go b/lib/autoupdate/rollout/client.go index 5d348da313c9a..501e610d4a9b2 100644 --- a/lib/autoupdate/rollout/client.go +++ b/lib/autoupdate/rollout/client.go @@ -20,7 +20,11 @@ package rollout import ( "context" + "time" + "github.com/gravitational/trace" + + "github.com/gravitational/teleport/api/constants" autoupdatepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/autoupdate/v1" "github.com/gravitational/teleport/api/types" ) @@ -52,3 +56,37 @@ type Client interface { // so the controller can measure the rollout progress. ListAutoUpdateAgentReports(ctx context.Context, pageSize int, nextKey string) ([]*autoupdatepb.AutoUpdateAgentReport, string, error) } + +func getAllReports(ctx context.Context, clt Client) ([]*autoupdatepb.AutoUpdateAgentReport, error) { + reports := make([]*autoupdatepb.AutoUpdateAgentReport, 0) + + // this is an in-memory client, we go for the max page size + const pageSize = 0 + var pageToken string + for { + page, nextToken, err := clt.ListAutoUpdateAgentReports(ctx, pageSize, pageToken) + if err != nil { + return nil, trace.Wrap(err) + } + reports = append(reports, page...) + if nextToken == "" { + return reports, nil + } + pageToken = nextToken + } +} + +func getAllValidReports(ctx context.Context, clt Client, now time.Time) ([]*autoupdatepb.AutoUpdateAgentReport, error) { + allReports, err := getAllReports(ctx, clt) + if err != nil && !trace.IsNotFound(err) { + return nil, trace.Wrap(err, "getting all reports") + } + + validReports := make([]*autoupdatepb.AutoUpdateAgentReport, len(allReports)) + for _, report := range allReports { + if now.Sub(report.GetSpec().GetTimestamp().AsTime()) <= constants.AutoUpdateAgentReportPeriod { + validReports = append(validReports, report) + } + } + return validReports, nil +} diff --git a/lib/autoupdate/rollout/strategy_haltonerror.go b/lib/autoupdate/rollout/strategy_haltonerror.go index 6e0df8957f7a9..d80ca7c654921 100644 --- a/lib/autoupdate/rollout/strategy_haltonerror.go +++ b/lib/autoupdate/rollout/strategy_haltonerror.go @@ -61,43 +61,8 @@ func newHaltOnErrorStrategy(log *slog.Logger, clt Client) (rolloutStrategy, erro }, nil } -func (h *haltOnErrorStrategy) getAllReports(ctx context.Context) ([]*autoupdate.AutoUpdateAgentReport, error) { - reports := make([]*autoupdate.AutoUpdateAgentReport, 0) - - // this is an in-memory client, we go for the max page size - const pageSize = 0 - var pageToken string - for { - page, nextToken, err := h.clt.ListAutoUpdateAgentReports(ctx, pageSize, pageToken) - if err != nil { - return nil, trace.Wrap(err) - } - reports = append(reports, page...) - if nextToken == "" { - return reports, nil - } - pageToken = nextToken - } -} - -func (h *haltOnErrorStrategy) getAllValidReports(ctx context.Context, now time.Time) ([]*autoupdate.AutoUpdateAgentReport, error) { - allReports, err := h.getAllReports(ctx) - if err != nil && !trace.IsNotFound(err) { - return nil, trace.Wrap(err, "getting all reports") - } - - validReports := make([]*autoupdate.AutoUpdateAgentReport, len(allReports)) - for _, report := range allReports { - // TODO replace time.minute by the auth periodic operation constant - if now.Sub(report.GetSpec().GetTimestamp().AsTime()) <= time.Minute { - validReports = append(validReports, report) - } - } - return validReports, nil -} - func (h *haltOnErrorStrategy) progressRollout(ctx context.Context, spec *autoupdate.AutoUpdateAgentRolloutSpec, status *autoupdate.AutoUpdateAgentRolloutStatus, now time.Time) error { - reports, err := h.getAllValidReports(ctx, now) + reports, err := getAllValidReports(ctx, h.clt, now) if err != nil && !trace.IsNotFound(err) { return trace.Wrap(err) } From f6836d209d6ee13ae6a13ab037f7fb50cfb4c08a Mon Sep 17 00:00:00 2001 From: hugoShaka Date: Tue, 27 May 2025 14:16:58 -0400 Subject: [PATCH 3/3] address edoardo's feedback --- lib/auth/autoupdate/autoupdatev1/service.go | 2 +- lib/autoupdate/rollout/client.go | 2 +- lib/autoupdate/rollout/transitions_test.go | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/auth/autoupdate/autoupdatev1/service.go b/lib/auth/autoupdate/autoupdatev1/service.go index 6bad36a12f68f..0dee3c4f1c72a 100644 --- a/lib/auth/autoupdate/autoupdatev1/service.go +++ b/lib/auth/autoupdate/autoupdatev1/service.go @@ -662,7 +662,7 @@ func (s *Service) DeleteAutoUpdateAgentRollout(ctx context.Context, req *autoupd func (s *Service) getAllReports(ctx context.Context) ([]*autoupdate.AutoUpdateAgentReport, error) { reports := make([]*autoupdate.AutoUpdateAgentReport, 0) - // this is an in-memory client, we go for the max page size + // this is an in-memory client, we go for the default page size const pageSize = 0 var pageToken string for { diff --git a/lib/autoupdate/rollout/client.go b/lib/autoupdate/rollout/client.go index 501e610d4a9b2..d0fbbfaeae427 100644 --- a/lib/autoupdate/rollout/client.go +++ b/lib/autoupdate/rollout/client.go @@ -58,7 +58,7 @@ type Client interface { } func getAllReports(ctx context.Context, clt Client) ([]*autoupdatepb.AutoUpdateAgentReport, error) { - reports := make([]*autoupdatepb.AutoUpdateAgentReport, 0) + var reports []*autoupdatepb.AutoUpdateAgentReport // this is an in-memory client, we go for the max page size const pageSize = 0 diff --git a/lib/autoupdate/rollout/transitions_test.go b/lib/autoupdate/rollout/transitions_test.go index 9ee1e6464292c..8f72b5b3de543 100644 --- a/lib/autoupdate/rollout/transitions_test.go +++ b/lib/autoupdate/rollout/transitions_test.go @@ -132,7 +132,7 @@ func TestTriggerGroups(t *testing.T) { name: "valid transition", rollout: &autoupdatev1pb.AutoUpdateAgentRollout{ Spec: spec, - Status: proto.Clone(status).(*autoupdatev1pb.AutoUpdateAgentRolloutStatus), + Status: proto.CloneOf(status), }, groupNames: []string{"blue", "prod", "backup"}, desiredState: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, @@ -175,7 +175,7 @@ func TestTriggerGroups(t *testing.T) { name: "valid transition, with reports", rollout: &autoupdatev1pb.AutoUpdateAgentRollout{ Spec: spec, - Status: proto.Clone(status).(*autoupdatev1pb.AutoUpdateAgentRolloutStatus), + Status: proto.CloneOf(status), }, groupNames: []string{"blue", "prod", "backup"}, desiredState: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, @@ -235,7 +235,7 @@ func TestTriggerGroups(t *testing.T) { name: "unsupported desired state", rollout: &autoupdatev1pb.AutoUpdateAgentRollout{ Spec: spec, - Status: proto.Clone(status).(*autoupdatev1pb.AutoUpdateAgentRolloutStatus), + Status: proto.CloneOf(status), }, groupNames: []string{"prod", "backup"}, desiredState: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ROLLEDBACK, @@ -253,7 +253,7 @@ func TestTriggerGroups(t *testing.T) { AutoupdateMode: autoupdate.AgentsUpdateModeEnabled, Strategy: autoupdate.AgentsStrategyTimeBased, }, - Status: proto.Clone(status).(*autoupdatev1pb.AutoUpdateAgentRolloutStatus), + Status: proto.CloneOf(status), }, groupNames: []string{"prod", "backup"}, desiredState: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE, @@ -270,7 +270,7 @@ func TestTriggerGroups(t *testing.T) { Schedule: autoupdate.AgentsScheduleImmediate, AutoupdateMode: autoupdate.AgentsUpdateModeEnabled, }, - Status: proto.Clone(status).(*autoupdatev1pb.AutoUpdateAgentRolloutStatus), + Status: proto.CloneOf(status), }, groupNames: []string{"prod", "backup"}, desiredState: autoupdatev1pb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE,