From df2db6d12e829b880d7c96345cdc77756bd889da Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Thu, 1 Feb 2024 21:55:41 +0100 Subject: [PATCH] Fix oversized report submission --- .../teleport/aggregating/submitter.go | 11 +++++--- .../teleport/aggregating/submitter_test.go | 25 +++++++++++++++++++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/lib/usagereporter/teleport/aggregating/submitter.go b/lib/usagereporter/teleport/aggregating/submitter.go index 38d3ef61da7aa..19a81a9a890b0 100644 --- a/lib/usagereporter/teleport/aggregating/submitter.go +++ b/lib/usagereporter/teleport/aggregating/submitter.go @@ -131,10 +131,13 @@ func submitOnce(ctx context.Context, c SubmitterConfig) { } freeBatchSize := submitBatchSize - len(userActivityReports) - resourcePresenceReports, err := svc.listResourcePresenceReports(ctx, freeBatchSize) - if err != nil { - c.Log.WithError(err).Error("Failed to load resource counts reports for submission.") - return + var resourcePresenceReports []*prehogv1.ResourcePresenceReport + if freeBatchSize > 0 { + resourcePresenceReports, err = svc.listResourcePresenceReports(ctx, freeBatchSize) + if err != nil { + c.Log.WithError(err).Error("Failed to load resource counts reports for submission.") + return + } } totalReportCount := len(userActivityReports) + len(resourcePresenceReports) diff --git a/lib/usagereporter/teleport/aggregating/submitter_test.go b/lib/usagereporter/teleport/aggregating/submitter_test.go index 1e229d94292a4..f7f63220ffb4b 100644 --- a/lib/usagereporter/teleport/aggregating/submitter_test.go +++ b/lib/usagereporter/teleport/aggregating/submitter_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/google/uuid" + "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" @@ -52,8 +53,13 @@ func TestSubmitOnce(t *testing.T) { svc := reportService{bk} var submitted []*prehogv1.UserActivityReport + var submittedPresence []*prehogv1.ResourcePresenceReport submitOk := func(ctx context.Context, req *prehogv1.SubmitUsageReportsRequest) (uuid.UUID, error) { + if l := len(req.UserActivity) + len(req.ResourcePresence); l > submitBatchSize { + return uuid.Nil, trace.LimitExceeded("got %v reports, expected at most %v", l, submitBatchSize) + } submitted = append(submitted, req.UserActivity...) + submittedPresence = append(submittedPresence, req.ResourcePresence...) return uuid.New(), nil } submitErr := func(ctx context.Context, req *prehogv1.SubmitUsageReportsRequest) (uuid.UUID, error) { @@ -146,7 +152,9 @@ func TestSubmitOnce(t *testing.T) { // successful submission, no remaining events but the alert stays for one more cycle submitOnce(ctx, scfg) require.Len(t, submitted, 1) + require.Len(t, submittedPresence, 1) submitted = nil + submittedPresence = nil alerts, err = scfg.Status.GetClusterAlerts(ctx, types.GetClusterAlertsRequest{ AlertID: alertName, @@ -163,4 +171,21 @@ func TestSubmitOnce(t *testing.T) { }) require.NoError(t, err) require.Empty(t, alerts) + + for i := 0; i < 20; i++ { + require.NoError(t, svc.upsertUserActivityReport(ctx, newReport(time.Now().UTC().Add(time.Duration(i)*time.Second)), reportTTL)) + } + for i := 0; i < 15; i++ { + require.NoError(t, svc.upsertResourcePresenceReport(ctx, newResourcePresenceReport(time.Now().UTC().Add(time.Duration(i)*time.Second)), reportTTL)) + } + clk.Advance(submitLockDuration) + submitOnce(ctx, scfg) + clk.Advance(submitLockDuration) + submitOnce(ctx, scfg) + clk.Advance(submitLockDuration) + submitOnce(ctx, scfg) + clk.Advance(submitLockDuration) + submitOnce(ctx, scfg) + require.Len(t, submitted, 20) + require.Len(t, submittedPresence, 15) }