diff --git a/lib/events/dynamoevents/dynamoevents.go b/lib/events/dynamoevents/dynamoevents.go index 7df2fa70e7316..6a958f6c5ca8b 100644 --- a/lib/events/dynamoevents/dynamoevents.go +++ b/lib/events/dynamoevents/dynamoevents.go @@ -1657,7 +1657,44 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput) ([]even l.checkpoint.EventKey = "" } // Stop early when the fetcher's total size exceeds the response size limit. - if l.totalSize+len(data) >= events.MaxEventBytesInResponse { + if l.totalSize+len(data) > events.MaxEventBytesInResponse { + // Encountered an event that would push the total page over the size limit. + // Return all processed events, and the next event will be picked up on the next page. + if len(out) > 0 { + if err := l.saveCheckpointAtEvent(out[len(out)-1]); err != nil { + return nil, false, trace.Wrap(err) + } + return out, true, nil + } + + // A single event is larger than the max page size - the best we can + // do is try to trim it. + e.FieldsMap, err = trimToMaxSize(e.FieldsMap) + if err != nil { + return nil, false, trace.Wrap(err, "failed to trim event to max size") + } + trimmedData, err := json.Marshal(e.FieldsMap) + if err != nil { + return nil, false, trace.Wrap(err) + } + + if l.totalSize+len(trimmedData) > events.MaxEventBytesInResponse { + // Failed to trim the event to size. + // Even if we fail to trim the event, we still try to return the oversized event. + l.log.WarnContext(context.Background(), "Failed to trim event exceeding maximum response size.", + "event_type", e.FieldsMap.GetType(), + "event_id", e.FieldsMap.GetID(), + "event_size", len(data), + "event_size_after_trim", len(trimmedData), + ) + } + events.MetricQueriedTrimmedEvents.Inc() + + l.totalSize += len(trimmedData) + out = append(out, e) + l.left-- + + // Since we reached the response size limit, simply return the event. if err := l.saveCheckpointAtEvent(out[len(out)-1]); err != nil { return nil, false, trace.Wrap(err) } @@ -1677,6 +1714,26 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput) ([]even return out, false, nil } +// trimToMaxSize attempts to trim the event to fit into the maximum response size (MaxEventBytesInResponse). +// If the event is larger than the maximum response size, it will be trimmed +// to the maximum size, which may result in loss of data. +// Trimming requires unmarshalling the event to apievents.AuditEvent and then +// calling TrimToMaxSize on it. +// This is not an efficient operation, but it is executed at most once per page, +// and only when a single event exceeds the limit, +// so it should not be a problem in practice. +func trimToMaxSize(fields events.EventFields) (events.EventFields, error) { + event, err := events.FromEventFields(fields) + if err != nil { + return nil, trace.Wrap(err) + } + + event = event.TrimToMaxSize(events.MaxEventBytesInResponse) + + fields, err = events.ToEventFields(event) + return fields, trace.Wrap(err) +} + // saveCheckpointAtEvent updates the checkpoint iterator at the given event. // This overrides LastEvaluatedKey to resume future processing from this iterator. func (l *eventsFetcher) saveCheckpointAtEvent(e event) error { diff --git a/lib/events/dynamoevents/dynamoevents_test.go b/lib/events/dynamoevents/dynamoevents_test.go index c466fc448b41e..288bdbd3e2e8b 100644 --- a/lib/events/dynamoevents/dynamoevents_test.go +++ b/lib/events/dynamoevents/dynamoevents_test.go @@ -1028,11 +1028,32 @@ func Test_eventsFetcher_QueryByDateIndex(t *testing.T) { AppName: "app-4", }, } + bigUntrimmableEvent := &apievents.AppCreate{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC), + Type: events.AppCreateEvent, + }, + AppMetadata: apievents.AppMetadata{ + AppName: strings.Repeat("aaaaa", events.MaxEventBytesInResponse), + }, + } + bigTrimmableEvent := &apievents.DatabaseSessionQuery{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC), + Type: events.DatabaseSessionQueryEvent, + }, + DatabaseQuery: strings.Repeat("aaaaa", events.MaxEventBytesInResponse), + } + bigTrimmedEvent := bigTrimmableEvent.TrimToMaxSize(events.MaxEventBytesInResponse) // have a deterministic session ID (UID) when used in test cases key1 := eventToKey(event1) key3 := eventToKey(event3) key4 := eventToKey(event4) + keyUntrimmable := eventToKey(bigUntrimmableEvent) + keyTrimmed := eventToKey(bigTrimmedEvent) tests := []struct { name string @@ -1097,6 +1118,65 @@ func Test_eventsFetcher_QueryByDateIndex(t *testing.T) { wantEvents: []apievents.AuditEvent{event1, event2, event3}, wantKey: &key3, }, + { + name: "events with big untrimmable event exceeding > MaxEventBytesInResponse", + limit: 10, + mockResponses: map[EventKey]mockResponse{ + {}: { + events: []apievents.AuditEvent{event1}, + returnKey: &key1, + }, + key1: { + events: []apievents.AuditEvent{event2, event3, bigUntrimmableEvent}, + returnKey: nil, + }, + }, + // we don't expect bigUntrimmableEvent because it should go to next batch + wantEvents: []apievents.AuditEvent{event1, event2, event3}, + wantKey: &key3, + }, + { + name: "only 1 big untrimmable event", + limit: 10, + mockResponses: map[EventKey]mockResponse{ + {}: { + events: []apievents.AuditEvent{bigUntrimmableEvent}, + returnKey: nil, + }, + }, + // we still want to receive the untrimmable event + wantEvents: []apievents.AuditEvent{bigUntrimmableEvent}, + wantKey: &keyUntrimmable, + }, + { + name: "events with big trimmable event exceeding > MaxEventBytesInResponse", + limit: 10, + mockResponses: map[EventKey]mockResponse{ + {}: { + events: []apievents.AuditEvent{event1}, + returnKey: &key1, + }, + key1: { + events: []apievents.AuditEvent{event2, event3, bigTrimmableEvent}, + returnKey: nil, + }, + }, + // we don't expect bigTrimmableEvent because it should go to next batch + wantEvents: []apievents.AuditEvent{event1, event2, event3}, + wantKey: &key3, + }, + { + name: "only 1 big trimmable event", + limit: 10, + mockResponses: map[EventKey]mockResponse{ + {}: { + events: []apievents.AuditEvent{bigTrimmableEvent}, + returnKey: nil, + }, + }, + wantEvents: []apievents.AuditEvent{bigTrimmedEvent}, + wantKey: &keyTrimmed, + }, } for _, test := range tests {