Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,44 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput) ([]even
l.checkpoint.EventKey = ""
}
// Stop early when the fetcher's total size exceeds the response size limit.
if l.totalSize+len(data) >= events.MaxEventBytesInResponse {
if l.totalSize+len(data) > events.MaxEventBytesInResponse {
// Encountered an event that would push the total page over the size limit.
// Return all processed events, and the next event will be picked up on the next page.
if len(out) > 0 {
if err := l.saveCheckpointAtEvent(out[len(out)-1]); err != nil {
return nil, false, trace.Wrap(err)
}
return out, true, nil
}

// A single event is larger than the max page size - the best we can
// do is try to trim it.
e.FieldsMap, err = trimToMaxSize(e.FieldsMap)
if err != nil {
return nil, false, trace.Wrap(err, "failed to trim event to max size")
}
trimmedData, err := json.Marshal(e.FieldsMap)
if err != nil {
return nil, false, trace.Wrap(err)
}

if l.totalSize+len(trimmedData) > events.MaxEventBytesInResponse {
// Failed to trim the event to size.
// Even if we fail to trim the event, we still try to return the oversized event.
l.log.WarnContext(context.Background(), "Failed to trim event exceeding maximum response size.",
"event_type", e.FieldsMap.GetType(),
"event_id", e.FieldsMap.GetID(),
"event_size", len(data),
"event_size_after_trim", len(trimmedData),
)
}
events.MetricQueriedTrimmedEvents.Inc()

l.totalSize += len(trimmedData)
out = append(out, e)
l.left--

// Since we reached the response size limit, simply return the event.
if err := l.saveCheckpointAtEvent(out[len(out)-1]); err != nil {
return nil, false, trace.Wrap(err)
}
Expand All @@ -1677,6 +1714,26 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput) ([]even
return out, false, nil
}

// trimToMaxSize attempts to trim the event to fit into the maximum response size (MaxEventBytesInResponse).
// If the event is larger than the maximum response size, it will be trimmed
// to the maximum size, which may result in loss of data.
// Trimming requires unmarshalling the event to apievents.AuditEvent and then
// calling TrimToMaxSize on it.
// This is not an efficient operation, but it is executed at most once per page,
// and only when a single event exceeds the limit,
// so it should not be a problem in practice.
func trimToMaxSize(fields events.EventFields) (events.EventFields, error) {
event, err := events.FromEventFields(fields)
if err != nil {
return nil, trace.Wrap(err)
}

event = event.TrimToMaxSize(events.MaxEventBytesInResponse)

fields, err = events.ToEventFields(event)
return fields, trace.Wrap(err)
}

// saveCheckpointAtEvent updates the checkpoint iterator at the given event.
// This overrides LastEvaluatedKey to resume future processing from this iterator.
func (l *eventsFetcher) saveCheckpointAtEvent(e event) error {
Expand Down
80 changes: 80 additions & 0 deletions lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,11 +1028,32 @@ func Test_eventsFetcher_QueryByDateIndex(t *testing.T) {
AppName: "app-4",
},
}
bigUntrimmableEvent := &apievents.AppCreate{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC),
Type: events.AppCreateEvent,
},
AppMetadata: apievents.AppMetadata{
AppName: strings.Repeat("aaaaa", events.MaxEventBytesInResponse),
},
}
bigTrimmableEvent := &apievents.DatabaseSessionQuery{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC),
Type: events.DatabaseSessionQueryEvent,
},
DatabaseQuery: strings.Repeat("aaaaa", events.MaxEventBytesInResponse),
}
bigTrimmedEvent := bigTrimmableEvent.TrimToMaxSize(events.MaxEventBytesInResponse)

// have a deterministic session ID (UID) when used in test cases
key1 := eventToKey(event1)
key3 := eventToKey(event3)
key4 := eventToKey(event4)
keyUntrimmable := eventToKey(bigUntrimmableEvent)
keyTrimmed := eventToKey(bigTrimmedEvent)

tests := []struct {
name string
Expand Down Expand Up @@ -1097,6 +1118,65 @@ func Test_eventsFetcher_QueryByDateIndex(t *testing.T) {
wantEvents: []apievents.AuditEvent{event1, event2, event3},
wantKey: &key3,
},
{
name: "events with big untrimmable event exceeding > MaxEventBytesInResponse",
limit: 10,
mockResponses: map[EventKey]mockResponse{
{}: {
events: []apievents.AuditEvent{event1},
returnKey: &key1,
},
key1: {
events: []apievents.AuditEvent{event2, event3, bigUntrimmableEvent},
returnKey: nil,
},
},
// we don't expect bigUntrimmableEvent because it should go to next batch
wantEvents: []apievents.AuditEvent{event1, event2, event3},
wantKey: &key3,
},
{
name: "only 1 big untrimmable event",
limit: 10,
mockResponses: map[EventKey]mockResponse{
{}: {
events: []apievents.AuditEvent{bigUntrimmableEvent},
returnKey: nil,
},
},
// we still want to receive the untrimmable event
wantEvents: []apievents.AuditEvent{bigUntrimmableEvent},
wantKey: &keyUntrimmable,
},
{
name: "events with big trimmable event exceeding > MaxEventBytesInResponse",
limit: 10,
mockResponses: map[EventKey]mockResponse{
{}: {
events: []apievents.AuditEvent{event1},
returnKey: &key1,
},
key1: {
events: []apievents.AuditEvent{event2, event3, bigTrimmableEvent},
returnKey: nil,
},
},
// we don't expect bigTrimmableEvent because it should go to next batch
wantEvents: []apievents.AuditEvent{event1, event2, event3},
wantKey: &key3,
},
{
name: "only 1 big trimmable event",
limit: 10,
mockResponses: map[EventKey]mockResponse{
{}: {
events: []apievents.AuditEvent{bigTrimmableEvent},
returnKey: nil,
},
},
wantEvents: []apievents.AuditEvent{bigTrimmedEvent},
wantKey: &keyTrimmed,
},
}

for _, test := range tests {
Expand Down
Loading