Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 75 additions & 12 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -1459,23 +1459,44 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
}
// Because this may break on non page boundaries an additional
// checkpoint is needed for sub-page breaks.
if l.totalSize+len(data) >= events.MaxEventBytesInResponse {
key, err := getSubPageCheckpoint(&e)
if l.totalSize+len(data) > events.MaxEventBytesInResponse {
// Encountered an event that would push the total page over the size limit.
// Return all processed events, and the next event will be picked up with the old iterator and saved sub-page checkpoint.
if len(out) > 0 {
if err := l.saveCheckpointAtEvent(e, oldIterator); err != nil {
return nil, false, trace.Wrap(err)
}
return out, true, nil
}

// A single event is larger than the max page size - the best we can
// do is try to trim it.
e.FieldsMap, err = trimToMaxSize(e.FieldsMap)
if err != nil {
return nil, false, trace.Wrap(err, "failed to trim event to max size")
}
trimmedData, err := json.Marshal(e.FieldsMap)
if err != nil {
return nil, false, trace.Wrap(err)
}
l.log.DebugContext(context.Background(), "breaking up sub-page due to event size", "key", key)
l.checkpoint.EventKey = key

// We need to reset the iterator so we get the previous page again.
l.checkpoint.Iterator = oldIterator

// If we stopped because of the size limit, we know that at least one event has to be fetched from the
// current date and old iterator, so we must set it to true independently of the hasLeftFun or
// the new iterator being empty.
l.hasLeft = true
if l.totalSize+len(trimmedData) > events.MaxEventBytesInResponse {
// Failed to trim the event to size.
// Even if we fail to trim the event, we still try to return the oversized event.
l.log.WarnContext(context.Background(), "Failed to trim event exceeding maximum response size.",
"event_type", e.FieldsMap.GetType(),
"event_id", e.FieldsMap.GetID(),
"event_size", len(data),
"event_size_after_trim", len(trimmedData),
)
}
events.MetricQueriedTrimmedEvents.Inc()

return out, true, nil
// We reached the response size limit.
// Either we reach the fetch limit (l.left == 0) or we have more items to process.
// For the latter, we must loop one more time to save the sub-page checkpoint at the next event,
// where we will resume future processing.
data = trimmedData
}
l.totalSize += len(data)
out = append(out, e)
Comment on lines -1478 to 1502
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the way sub-page checkpoints work in branch/v17 (old sub-pagination solution), we must save at the next event, not the last processed event in out slice. We simply perform one more loop iteration, which will trigger this logic to save at next event:

		if l.totalSize+len(data) > events.MaxEventBytesInResponse {
			// Encountered an event that would push the total page over the size limit.
			// Return all processed events, and the next event will be picked up with the old iterator and saved sub-page checkpoint.
			if len(out) > 0 {
				if err := l.saveCheckpointAtEvent(e, oldIterator); err != nil {
					return nil, false, trace.Wrap(err)
				}
				return out, true, nil
			}

Expand All @@ -1494,6 +1515,48 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
return out, false, nil
}

// trimToMaxSize attempts to trim the event to fit into the maximum response size (MaxEventBytesInResponse).
// If the event is larger than the maximum response size, it will be trimmed
// to the maximum size, which may result in loss of data.
// Trimming requires unmarshalling the event to apievents.AuditEvent and then
// calling TrimToMaxSize on it.
// This is not an efficient operation, but it is executed at most once per page,
// and only when a single event exceeds the limit,
// so it should not be a problem in practice.
func trimToMaxSize(fields events.EventFields) (events.EventFields, error) {
event, err := events.FromEventFields(fields)
if err != nil {
return nil, trace.Wrap(err)
}

event = event.TrimToMaxSize(events.MaxEventBytesInResponse)

fields, err = events.ToEventFields(event)
return fields, trace.Wrap(err)
}

// saveCheckpointAtEvent generates a sub-page checkpoint at the event causing a page break.
// Subsequent processing will resume from this exact event.
func (l *eventsFetcher) saveCheckpointAtEvent(e event, oldIterator string) error {
Copy link
Copy Markdown
Contributor Author

@kshi36 kshi36 Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised function saveCheckpointAtEvent to envelope the old sub-page checkpoint logic

key, err := getSubPageCheckpoint(&e)
if err != nil {
return trace.Wrap(err)
}

l.checkpoint.EventKey = key

// We need to reset the iterator so we get the previous page again.
l.checkpoint.Iterator = oldIterator

// If we stopped because of the size limit, we know that at least one event has to be fetched from the
// current date and old iterator, so we must set it to true independently of the hasLeftFun or
// the new iterator being empty.
l.hasLeft = true

l.log.DebugContext(context.Background(), "breaking up sub-page due to event size", "key", key, "oldIterator", oldIterator)
return nil
}

func (l *eventsFetcher) QueryByDateIndex(ctx context.Context, filterExpr *string) (values []event, err error) {
query := "CreatedAtDate = :date AND CreatedAt BETWEEN :start and :end"

Expand Down
251 changes: 251 additions & 0 deletions lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@ import (
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
dynamodbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
Expand Down Expand Up @@ -781,3 +787,248 @@ func TestCursorIteratorPrecision(t *testing.T) {
}

}

func Test_eventsFetcher_QueryByDateIndex(t *testing.T) {
event1 := &apievents.AppCreate{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC),
Type: events.AppCreateEvent,
},
AppMetadata: apievents.AppMetadata{
AppName: "app-1",
},
}
event2 := &apievents.AppCreate{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC),
Type: events.AppCreateEvent,
},
AppMetadata: apievents.AppMetadata{
AppName: "app-2",
},
}
event3 := &apievents.AppCreate{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC),
Type: events.AppCreateEvent,
},
AppMetadata: apievents.AppMetadata{
AppName: "app-3",
},
}
bigUntrimmableEvent := &apievents.AppCreate{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC),
Type: events.AppCreateEvent,
},
AppMetadata: apievents.AppMetadata{
AppName: strings.Repeat("aaaaa", events.MaxEventBytesInResponse),
},
}
bigTrimmableEvent := &apievents.DatabaseSessionQuery{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC),
Type: events.DatabaseSessionQueryEvent,
},
DatabaseQuery: strings.Repeat("aaaaa", events.MaxEventBytesInResponse),
}
bigTrimmedEvent := bigTrimmableEvent.TrimToMaxSize(events.MaxEventBytesInResponse)

// have a deterministic session ID (UID) when used in test cases
// expect responses to return key of the next event to process, not the last event processed (sub-page break logic)
keyUntrimmable := mustEventToKey(t, bigUntrimmableEvent)
keyTrimmable := mustEventToKey(t, bigTrimmableEvent)

tests := []struct {
name string
limit int32
mockResponses map[EventKey]mockResponse
wantEvents []apievents.AuditEvent
wantKey string
}{
{
name: "no data returned from query, return empty results",
limit: 10,
},
{
name: "events with big untrimmable event exceeding > MaxEventBytesInResponse",
limit: 10,
mockResponses: map[EventKey]mockResponse{
{}: {
events: []apievents.AuditEvent{event1, event2, event3, bigUntrimmableEvent},
},
},
// we don't expect bigUntrimmableEvent because it should go to next batch
wantEvents: []apievents.AuditEvent{event1, event2, event3},
wantKey: keyUntrimmable,
Comment on lines +861 to +868
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tests expect key of the next processed event (the big trimmable/untrimmable events)

},
{
name: "only 1 big untrimmable event",
limit: 10,
mockResponses: map[EventKey]mockResponse{
{}: {
events: []apievents.AuditEvent{bigUntrimmableEvent},
},
},
// we still want to receive the untrimmable event
wantEvents: []apievents.AuditEvent{bigUntrimmableEvent},
},
{
name: "events with big trimmable event exceeding > MaxEventBytesInResponse",
limit: 10,
mockResponses: map[EventKey]mockResponse{
{}: {
events: []apievents.AuditEvent{event1, event2, event3, bigTrimmableEvent},
},
},
// we don't expect bigTrimmableEvent because it should go to next batch
wantEvents: []apievents.AuditEvent{event1, event2, event3},
wantKey: keyTrimmable,
},
{
name: "only 1 big trimmable event",
limit: 10,
mockResponses: map[EventKey]mockResponse{
{}: {
events: []apievents.AuditEvent{bigTrimmableEvent},
},
},
wantEvents: []apievents.AuditEvent{bigTrimmedEvent},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mock := &mockQuery{
responses: test.mockResponses,
}

ef := eventsFetcher{
log: slog.Default(),
api: mock,
dates: []string{"2025-02-05"},
fromUTC: time.Date(2025, 2, 5, 0, 0, 0, 0, time.UTC),
toUTC: time.Date(2025, 2, 6, 0, 0, 0, 0, time.UTC),
checkpoint: &checkpointKey{},
left: test.limit,
filter: searchEventsFilter{},
foundStart: true,
hasLeft: true,
}

gotRawEvents, err := ef.QueryByDateIndex(t.Context(), getExprFilter(ef.filter))
require.NoError(t, err)

if test.wantKey != "" {
require.Equal(t, test.wantKey, ef.checkpoint.EventKey)
}

got := make([]events.EventFields, 0, len(gotRawEvents))
for _, rawEvent := range gotRawEvents {
got = append(got, rawEvent.FieldsMap)
}

want := make([]events.EventFields, 0, len(test.wantEvents))
for _, event := range test.wantEvents {
fields, err := events.ToEventFields(event)
require.NoError(t, err)
want = append(want, fields)
}

require.Empty(t, cmp.Diff(want, got, cmpopts.EquateEmpty()))
})
}
}

type mockQuery struct {
responses map[EventKey]mockResponse
}

type mockResponse struct {
events []apievents.AuditEvent
}

// Query is a simple mock implementation that does not distinguish queries by date.
func (m *mockQuery) Query(ctx context.Context, params *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) {
if m.responses == nil {
return &dynamodb.QueryOutput{}, nil
}

var currentKey EventKey
if params.ExclusiveStartKey != nil {
if err := attributevalue.UnmarshalMap(params.ExclusiveStartKey, &currentKey); err != nil {
return nil, err
}
}

response, ok := m.responses[currentKey]
if !ok {
return nil, trace.Errorf("return parameter not defined in mockQuery")
}

items, err := eventsToItems(response.events)
if err != nil {
return nil, err
}

return &dynamodb.QueryOutput{
Items: items,
}, nil
}

func eventsToItems(in []apievents.AuditEvent) ([]map[string]dynamodbtypes.AttributeValue, error) {
items := make([]map[string]dynamodbtypes.AttributeValue, 0, len(in))
for _, e := range in {
fieldsMap, err := events.ToEventFields(e)
if err != nil {
return nil, err
}

event := event{
EventKey: EventKey{
SessionID: e.GetID(), // to make testing deterministic, use ID
EventIndex: e.GetIndex(),
CreatedAt: e.GetTime().Unix(),
CreatedAtDate: e.GetTime().Format(iso8601DateFormat),
},
EventType: e.GetType(),
EventNamespace: apidefaults.Namespace,
FieldsMap: fieldsMap,
}
item, err := attributevalue.MarshalMap(event)
if err != nil {
return nil, err
}

items = append(items, item)
}

return items, nil
}

func mustEventToKey(t *testing.T, e apievents.AuditEvent) string {
fieldsMap, err := events.ToEventFields(e)
require.NoError(t, err)

event := event{
EventKey: EventKey{
SessionID: e.GetID(), // to make testing deterministic, use ID
EventIndex: e.GetIndex(),
CreatedAt: e.GetTime().Unix(),
CreatedAtDate: e.GetTime().Format(iso8601DateFormat),
},
EventType: e.GetType(),
EventNamespace: apidefaults.Namespace,
FieldsMap: fieldsMap,
}

key, err := getSubPageCheckpoint(&event)
require.NoError(t, err)

return key
}
Loading