diff --git a/lib/events/dynamoevents/dynamoevents.go b/lib/events/dynamoevents/dynamoevents.go index ff3795e6da684..ffc7652584bc8 100644 --- a/lib/events/dynamoevents/dynamoevents.go +++ b/lib/events/dynamoevents/dynamoevents.go @@ -1459,23 +1459,44 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft } // Because this may break on non page boundaries an additional // checkpoint is needed for sub-page breaks. - if l.totalSize+len(data) >= events.MaxEventBytesInResponse { - key, err := getSubPageCheckpoint(&e) + if l.totalSize+len(data) > events.MaxEventBytesInResponse { + // Encountered an event that would push the total page over the size limit. + // Return all processed events, and the next event will be picked up with the old iterator and saved sub-page checkpoint. + if len(out) > 0 { + if err := l.saveCheckpointAtEvent(e, oldIterator); err != nil { + return nil, false, trace.Wrap(err) + } + return out, true, nil + } + + // A single event is larger than the max page size - the best we can + // do is try to trim it. + e.FieldsMap, err = trimToMaxSize(e.FieldsMap) + if err != nil { + return nil, false, trace.Wrap(err, "failed to trim event to max size") + } + trimmedData, err := json.Marshal(e.FieldsMap) if err != nil { return nil, false, trace.Wrap(err) } - l.log.DebugContext(context.Background(), "breaking up sub-page due to event size", "key", key) - l.checkpoint.EventKey = key - - // We need to reset the iterator so we get the previous page again. - l.checkpoint.Iterator = oldIterator - // If we stopped because of the size limit, we know that at least one event has to be fetched from the - // current date and old iterator, so we must set it to true independently of the hasLeftFun or - // the new iterator being empty. - l.hasLeft = true + if l.totalSize+len(trimmedData) > events.MaxEventBytesInResponse { + // Failed to trim the event to size. + // Even if we fail to trim the event, we still try to return the oversized event. + l.log.WarnContext(context.Background(), "Failed to trim event exceeding maximum response size.", + "event_type", e.FieldsMap.GetType(), + "event_id", e.FieldsMap.GetID(), + "event_size", len(data), + "event_size_after_trim", len(trimmedData), + ) + } + events.MetricQueriedTrimmedEvents.Inc() - return out, true, nil + // We reached the response size limit. + // Either we reach the fetch limit (l.left == 0) or we have more items to process. + // For the latter, we must loop one more time to save the sub-page checkpoint at the next event, + // where we will resume future processing. + data = trimmedData } l.totalSize += len(data) out = append(out, e) @@ -1494,6 +1515,48 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft return out, false, nil } +// trimToMaxSize attempts to trim the event to fit into the maximum response size (MaxEventBytesInResponse). +// If the event is larger than the maximum response size, it will be trimmed +// to the maximum size, which may result in loss of data. +// Trimming requires unmarshalling the event to apievents.AuditEvent and then +// calling TrimToMaxSize on it. +// This is not an efficient operation, but it is executed at most once per page, +// and only when a single event exceeds the limit, +// so it should not be a problem in practice. +func trimToMaxSize(fields events.EventFields) (events.EventFields, error) { + event, err := events.FromEventFields(fields) + if err != nil { + return nil, trace.Wrap(err) + } + + event = event.TrimToMaxSize(events.MaxEventBytesInResponse) + + fields, err = events.ToEventFields(event) + return fields, trace.Wrap(err) +} + +// saveCheckpointAtEvent generates a sub-page checkpoint at the event causing a page break. +// Subsequent processing will resume from this exact event. +func (l *eventsFetcher) saveCheckpointAtEvent(e event, oldIterator string) error { + key, err := getSubPageCheckpoint(&e) + if err != nil { + return trace.Wrap(err) + } + + l.checkpoint.EventKey = key + + // We need to reset the iterator so we get the previous page again. + l.checkpoint.Iterator = oldIterator + + // If we stopped because of the size limit, we know that at least one event has to be fetched from the + // current date and old iterator, so we must set it to true independently of the hasLeftFun or + // the new iterator being empty. + l.hasLeft = true + + l.log.DebugContext(context.Background(), "breaking up sub-page due to event size", "key", key, "oldIterator", oldIterator) + return nil +} + func (l *eventsFetcher) QueryByDateIndex(ctx context.Context, filterExpr *string) (values []event, err error) { query := "CreatedAtDate = :date AND CreatedAt BETWEEN :start and :end" diff --git a/lib/events/dynamoevents/dynamoevents_test.go b/lib/events/dynamoevents/dynamoevents_test.go index a792fb2c4b6c2..3d0e39bb96f20 100644 --- a/lib/events/dynamoevents/dynamoevents_test.go +++ b/lib/events/dynamoevents/dynamoevents_test.go @@ -34,6 +34,11 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + dynamodbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" @@ -41,6 +46,7 @@ import ( "github.com/stretchr/testify/require" "github.com/gravitational/teleport" + apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/events" @@ -781,3 +787,248 @@ func TestCursorIteratorPrecision(t *testing.T) { } } + +func Test_eventsFetcher_QueryByDateIndex(t *testing.T) { + event1 := &apievents.AppCreate{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC), + Type: events.AppCreateEvent, + }, + AppMetadata: apievents.AppMetadata{ + AppName: "app-1", + }, + } + event2 := &apievents.AppCreate{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC), + Type: events.AppCreateEvent, + }, + AppMetadata: apievents.AppMetadata{ + AppName: "app-2", + }, + } + event3 := &apievents.AppCreate{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC), + Type: events.AppCreateEvent, + }, + AppMetadata: apievents.AppMetadata{ + AppName: "app-3", + }, + } + bigUntrimmableEvent := &apievents.AppCreate{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC), + Type: events.AppCreateEvent, + }, + AppMetadata: apievents.AppMetadata{ + AppName: strings.Repeat("aaaaa", events.MaxEventBytesInResponse), + }, + } + bigTrimmableEvent := &apievents.DatabaseSessionQuery{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC), + Type: events.DatabaseSessionQueryEvent, + }, + DatabaseQuery: strings.Repeat("aaaaa", events.MaxEventBytesInResponse), + } + bigTrimmedEvent := bigTrimmableEvent.TrimToMaxSize(events.MaxEventBytesInResponse) + + // have a deterministic session ID (UID) when used in test cases + // expect responses to return key of the next event to process, not the last event processed (sub-page break logic) + keyUntrimmable := mustEventToKey(t, bigUntrimmableEvent) + keyTrimmable := mustEventToKey(t, bigTrimmableEvent) + + tests := []struct { + name string + limit int32 + mockResponses map[EventKey]mockResponse + wantEvents []apievents.AuditEvent + wantKey string + }{ + { + name: "no data returned from query, return empty results", + limit: 10, + }, + { + name: "events with big untrimmable event exceeding > MaxEventBytesInResponse", + limit: 10, + mockResponses: map[EventKey]mockResponse{ + {}: { + events: []apievents.AuditEvent{event1, event2, event3, bigUntrimmableEvent}, + }, + }, + // we don't expect bigUntrimmableEvent because it should go to next batch + wantEvents: []apievents.AuditEvent{event1, event2, event3}, + wantKey: keyUntrimmable, + }, + { + name: "only 1 big untrimmable event", + limit: 10, + mockResponses: map[EventKey]mockResponse{ + {}: { + events: []apievents.AuditEvent{bigUntrimmableEvent}, + }, + }, + // we still want to receive the untrimmable event + wantEvents: []apievents.AuditEvent{bigUntrimmableEvent}, + }, + { + name: "events with big trimmable event exceeding > MaxEventBytesInResponse", + limit: 10, + mockResponses: map[EventKey]mockResponse{ + {}: { + events: []apievents.AuditEvent{event1, event2, event3, bigTrimmableEvent}, + }, + }, + // we don't expect bigTrimmableEvent because it should go to next batch + wantEvents: []apievents.AuditEvent{event1, event2, event3}, + wantKey: keyTrimmable, + }, + { + name: "only 1 big trimmable event", + limit: 10, + mockResponses: map[EventKey]mockResponse{ + {}: { + events: []apievents.AuditEvent{bigTrimmableEvent}, + }, + }, + wantEvents: []apievents.AuditEvent{bigTrimmedEvent}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + mock := &mockQuery{ + responses: test.mockResponses, + } + + ef := eventsFetcher{ + log: slog.Default(), + api: mock, + dates: []string{"2025-02-05"}, + fromUTC: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC), + toUTC: time.Date(2025, 2, 6, 0, 0, 0, 0, time.UTC), + checkpoint: &checkpointKey{}, + left: test.limit, + filter: searchEventsFilter{}, + foundStart: true, + hasLeft: true, + } + + gotRawEvents, err := ef.QueryByDateIndex(t.Context(), getExprFilter(ef.filter)) + require.NoError(t, err) + + if test.wantKey != "" { + require.Equal(t, test.wantKey, ef.checkpoint.EventKey) + } + + got := make([]events.EventFields, 0, len(gotRawEvents)) + for _, rawEvent := range gotRawEvents { + got = append(got, rawEvent.FieldsMap) + } + + want := make([]events.EventFields, 0, len(test.wantEvents)) + for _, event := range test.wantEvents { + fields, err := events.ToEventFields(event) + require.NoError(t, err) + want = append(want, fields) + } + + require.Empty(t, cmp.Diff(want, got, cmpopts.EquateEmpty())) + }) + } +} + +type mockQuery struct { + responses map[EventKey]mockResponse +} + +type mockResponse struct { + events []apievents.AuditEvent +} + +// Query is a simple mock implementation that does not distinguish queries by date. +func (m *mockQuery) Query(ctx context.Context, params *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) { + if m.responses == nil { + return &dynamodb.QueryOutput{}, nil + } + + var currentKey EventKey + if params.ExclusiveStartKey != nil { + if err := attributevalue.UnmarshalMap(params.ExclusiveStartKey, ¤tKey); err != nil { + return nil, err + } + } + + response, ok := m.responses[currentKey] + if !ok { + return nil, trace.Errorf("return parameter not defined in mockQuery") + } + + items, err := eventsToItems(response.events) + if err != nil { + return nil, err + } + + return &dynamodb.QueryOutput{ + Items: items, + }, nil +} + +func eventsToItems(in []apievents.AuditEvent) ([]map[string]dynamodbtypes.AttributeValue, error) { + items := make([]map[string]dynamodbtypes.AttributeValue, 0, len(in)) + for _, e := range in { + fieldsMap, err := events.ToEventFields(e) + if err != nil { + return nil, err + } + + event := event{ + EventKey: EventKey{ + SessionID: e.GetID(), // to make testing deterministic, use ID + EventIndex: e.GetIndex(), + CreatedAt: e.GetTime().Unix(), + CreatedAtDate: e.GetTime().Format(iso8601DateFormat), + }, + EventType: e.GetType(), + EventNamespace: apidefaults.Namespace, + FieldsMap: fieldsMap, + } + item, err := attributevalue.MarshalMap(event) + if err != nil { + return nil, err + } + + items = append(items, item) + } + + return items, nil +} + +func mustEventToKey(t *testing.T, e apievents.AuditEvent) string { + fieldsMap, err := events.ToEventFields(e) + require.NoError(t, err) + + event := event{ + EventKey: EventKey{ + SessionID: e.GetID(), // to make testing deterministic, use ID + EventIndex: e.GetIndex(), + CreatedAt: e.GetTime().Unix(), + CreatedAtDate: e.GetTime().Format(iso8601DateFormat), + }, + EventType: e.GetType(), + EventNamespace: apidefaults.Namespace, + FieldsMap: fieldsMap, + } + + key, err := getSubPageCheckpoint(&event) + require.NoError(t, err) + + return key +}