diff --git a/lib/usagereporter/teleport/aggregating/reporter.go b/lib/usagereporter/teleport/aggregating/reporter.go index 3498f47ab2e27..3a460acb5ca1b 100644 --- a/lib/usagereporter/teleport/aggregating/reporter.go +++ b/lib/usagereporter/teleport/aggregating/reporter.go @@ -392,17 +392,16 @@ func (r *Reporter) persistUserActivity(ctx context.Context, startTime time.Time, records = append(records, record) } - for len(records) > 0 { - report, err := prepareUserActivityReport(r.clusterName, r.hostID, startTime, records) - if err != nil { - r.log.WithError(err).WithFields(logrus.Fields{ - "start_time": startTime, - "lost_records": len(records), - }).Error("Failed to prepare user activity report, dropping data.") - return - } - records = records[len(report.Records):] + reports, err := prepareUserActivityReports(r.clusterName, r.hostID, startTime, records) + if err != nil { + r.log.WithError(err).WithFields(logrus.Fields{ + "start_time": startTime, + "lost_records": len(records), + }).Error("Failed to prepare user activity report, dropping data.") + return + } + for _, report := range reports { if err := r.svc.upsertUserActivityReport(ctx, report, reportTTL); err != nil { r.log.WithError(err).WithFields(logrus.Fields{ "start_time": startTime, diff --git a/lib/usagereporter/teleport/aggregating/service.go b/lib/usagereporter/teleport/aggregating/service.go index 6af02d8507478..12172533e9ad1 100644 --- a/lib/usagereporter/teleport/aggregating/service.go +++ b/lib/usagereporter/teleport/aggregating/service.go @@ -51,28 +51,33 @@ func userActivityReportKey(reportUUID uuid.UUID, startTime time.Time) []byte { return backend.Key(userActivityReportsPrefix, startTime.Format(time.RFC3339), reportUUID.String()) } -func prepareUserActivityReport( +func prepareUserActivityReports( clusterName, reporterHostID []byte, startTime time.Time, records []*prehogv1.UserActivityRecord, -) (*prehogv1.UserActivityReport, error) { - reportUUID := uuid.New() - report := &prehogv1.UserActivityReport{ - ReportUuid: reportUUID[:], - ClusterName: clusterName, - ReporterHostid: reporterHostID, - StartTime: timestamppb.New(startTime), - Records: records, - } +) (reports []*prehogv1.UserActivityReport, err error) { + for len(records) > 0 { + reportUUID := uuid.New() + report := &prehogv1.UserActivityReport{ + ReportUuid: reportUUID[:], + ClusterName: clusterName, + ReporterHostid: reporterHostID, + StartTime: timestamppb.New(startTime), + Records: records, + } + + for proto.Size(report) > maxItemSize { + if len(report.Records) <= 1 { + return nil, trace.LimitExceeded("failed to marshal user activity report within size limit (this is a bug)") + } - for proto.Size(report) > maxItemSize { - if len(report.Records) <= 1 { - return nil, trace.LimitExceeded("failed to marshal user activity report within size limit (this is a bug)") + report.Records = report.Records[:len(report.Records)/2] } - report.Records = report.Records[:len(report.Records)/2] + records = records[len(report.Records):] + reports = append(reports, report) } - return report, nil + return reports, nil } // resourcePresenceReportKey returns the backend key for a resource presence report with diff --git a/lib/usagereporter/teleport/aggregating/service_test.go b/lib/usagereporter/teleport/aggregating/service_test.go index 3438aa250d365..9d075743b0779 100644 --- a/lib/usagereporter/teleport/aggregating/service_test.go +++ b/lib/usagereporter/teleport/aggregating/service_test.go @@ -90,6 +90,29 @@ func TestCRUD(t *testing.T) { require.True(t, proto.Equal(r2, reports[0])) } +func TestUserActivityReportSplitting(t *testing.T) { + recordCount := 10000 + records := make([]*prehogv1.UserActivityRecord, 0, recordCount) + for i := 0; i < recordCount; i++ { + records = append(records, &prehogv1.UserActivityRecord{ + UserName: []byte("user"), + Logins: 100500, + SshSessions: 42, + }) + } + reports, err := prepareUserActivityReports([]byte("clusterName"), []byte("reporterHostID"), time.Now(), records) + require.NoError(t, err) + require.GreaterOrEqual(t, len(reports), 2) // some reports were split into two + require.GreaterOrEqual(t, len(reports[0].Records), 2) // first report was able to contain a few user activity records + + // reassemble records and ensure that nothing was lost + recordsCopy := make([]*prehogv1.UserActivityRecord, 0, recordCount) + for _, report := range reports { + recordsCopy = append(recordsCopy, report.Records...) + } + require.Equal(t, records, recordsCopy, "some user activity records have been lost during splitting") +} + func TestLock(t *testing.T) { ctx := context.Background() clk := clockwork.NewFakeClock()