From d25610e77433273c1b6ab04f624610c682d4a284 Mon Sep 17 00:00:00 2001 From: hugoShaka Date: Thu, 4 Sep 2025 16:00:44 -0400 Subject: [PATCH] Marshal iterator into event instead of map[string]any This commit fixes a data loss bug causing the DynamoDB cursor EventIndex field to be sightly changed due to conversion issues. As this field is used to index events, this could lead to paginated queries not returning the right events, either returning events from before or after the requirested page. In the worst case, this could cause a livelock as the query continuisly processes the same events. The data loss issue is caused by improper JSON unmarshalling of large integers. This happened because of this reasons: - JSON is fundamentally flawed as it offers a single number type "binary64" for all numbers, whether they are integers or float. Go's encoding/json library uses field types to detect if the number should be stored in an int64 or a float64. - [The AWS SDK v2 migration PR](https://github.com/gravitational/teleport/pull/44363) changed the cursor JSON unmarshalling logic and unmarshalled the cursor into `map[string]any`. This caused every integer field of `event` to round-trip through float64. - [The Emit event fallback PR](https://github.com/gravitational/teleport/pull/40854) changed the EventIndex value from a small incremental integer to a large unix nanosecond timestamp in case of conflict. The large value was no longer safe for storage in a float64. The combination of those 3 factors caused the cursor EventIndex to get corrupted and caused unexpected event query index offsets. When preseted with a non-existing document, DynamoDB still hashes it and starts the query from its supposed location in the index. This is why this issue has not been detected for so long. Its consequences were: - duplicated events returned on 2 consecutive pages (this case was handled properly by the event forwarder as it keeps track of the last processed event) - livelock if the number of duplicated events exceed the page size - non-forwarded events if the index offset was in the future --- lib/events/dynamoevents/dynamoevents.go | 84 +++++++++++++----- lib/events/dynamoevents/dynamoevents_test.go | 89 +++++++++++++++++++- 2 files changed, 149 insertions(+), 24 deletions(-) diff --git a/lib/events/dynamoevents/dynamoevents.go b/lib/events/dynamoevents/dynamoevents.go index f9283eb7193a1..10c4a87eeaeda 100644 --- a/lib/events/dynamoevents/dynamoevents.go +++ b/lib/events/dynamoevents/dynamoevents.go @@ -225,15 +225,45 @@ type Log struct { svc *dynamodb.Client } +// EventKey contains the subset of event fields used as a dynamo primary key, +// or used by a secondary index. +// This is used in checkpointKey to track the last processed event of a query +// and resume the query from there. +type EventKey struct { + // SessionID and EventIndex must always be set. + // If the event is not linked to a session, we generate a random UUID. + SessionID string + // EventIndex represent the relative order of an event in the sesssion. + // Its value can be 0 if the event does not belong to a session or if it is + // the first event of a session. + // Next session events increase this counter. In case of conflict + // (two events with the same SessionID and EventIndex), EventIndex is set to + // the Unix nanosecond timestamp. This seems to break the "EventIndex + // monotonically increases" property and might cause events to be + // backfilled/skipped by ongoing queries. + // This behavior was introduced in https://github.com/gravitational/teleport/pull/40854. + // Since then, EventIndex is a large int64 and might not survive a round-trip + // through float64. When its JSON representation is unmarshalled into + // `map[string]any`, the event will lose EventIndex precision this will + // cause data-loss. For critical usage, like DynamoDB ExclusiveStartKey, + // one must always marshall/unmarshall using the typed event struct. + // event.FieldsMap["ei"] suffers from this data loss, so its value and + // EventIndex might be different. + EventIndex int64 + // CreatedAt is used to know if the event is outside of the requested time + // range (so we can discard the cursor completely in this case). + // This is also used as a secondary DynamodDB index. + CreatedAt int64 `json:",omitempty" dynamodbav:",omitempty"` + // CreatedAtDate is used to identify the event partition. + CreatedAtDate string `json:",omitempty" dynamodbav:",omitempty"` +} + type event struct { - SessionID string - EventIndex int64 + EventKey EventType string - CreatedAt int64 Expires *int64 `json:"Expires,omitempty" dynamodbav:",omitempty"` FieldsMap events.EventFields EventNamespace string - CreatedAtDate string } const ( @@ -647,13 +677,15 @@ func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamod return nil, trace.Wrap(err) } e := event{ - SessionID: sessionID, - EventIndex: in.GetIndex(), + EventKey: EventKey{ + SessionID: sessionID, + EventIndex: in.GetIndex(), + CreatedAt: in.GetTime().Unix(), + CreatedAtDate: in.GetTime().Format(iso8601DateFormat), + }, EventType: in.GetType(), EventNamespace: apidefaults.Namespace, - CreatedAt: in.GetTime().Unix(), FieldsMap: fieldsMap, - CreatedAtDate: in.GetTime().Format(iso8601DateFormat), } l.setExpiry(&e) av, err := attributevalue.MarshalMap(e) @@ -710,6 +742,7 @@ type checkpointKey struct { // EventKey is a derived identifier for an event used for resuming // sub-page breaks due to size constraints. + // TODO(hugoShaka): Deprecate and remove this field. EventKey string `json:"event_key,omitempty"` } @@ -840,7 +873,7 @@ func (l *Log) searchEventsRaw(ctx context.Context, fromUTC, toUTC time.Time, nam l.logger.DebugContext(ctx, "search events", "from", fromUTC, "to", toUTC, "filter", filter, "limit", limit, "start_key", startKey, "order", order, "checkpoint", checkpoint) if startKey != "" { - createdAt, err := GetCreatedAtFromStartKey(startKey) + createdAt, err := getCreatedAtFromCheckpoint(checkpoint) if err == nil { // we compare the cursor unix time to the from unix in order to drop the nanoseconds // that are not present in the cursor. @@ -962,10 +995,14 @@ func GetCreatedAtFromStartKey(startKey string) (time.Time, error) { if err != nil { return time.Time{}, trace.Wrap(err) } + return getCreatedAtFromCheckpoint(checkpoint) +} + +func getCreatedAtFromCheckpoint(checkpoint checkpointKey) (time.Time, error) { if checkpoint.Iterator == "" { return time.Time{}, errors.New("missing iterator") } - var e event + var e EventKey if err := json.Unmarshal([]byte(checkpoint.Iterator), &e); err != nil { return time.Time{}, trace.Wrap(err) } @@ -1015,13 +1052,13 @@ func getCheckpointFromLegacyStartKey(startKey string) (checkpointKey, error) { } // decode the dynamo attrs into the go map repr common to the old and new formats. - m := make(map[string]any) - if err := attributevalue.UnmarshalMap(convertedAttrMap, &m); err != nil { + var e event + if err := attributevalue.UnmarshalMap(convertedAttrMap, &e); err != nil { return checkpointKey{}, trace.Wrap(err) } // encode the map into json, making it equivalent to the new format. - iterator, err := json.Marshal(m) + iterator, err := json.Marshal(e) if err != nil { return checkpointKey{}, trace.Wrap(err) } @@ -1566,12 +1603,12 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft l.checkpoint.Iterator = "" if output.LastEvaluatedKey != nil { - m := make(map[string]any) - if err := attributevalue.UnmarshalMap(output.LastEvaluatedKey, &m); err != nil { + var e EventKey + if err := attributevalue.UnmarshalMap(output.LastEvaluatedKey, &e); err != nil { return nil, false, trace.Wrap(err) } - iter, err := json.Marshal(&m) + iter, err := json.Marshal(&e) if err != nil { return nil, false, err } @@ -1592,6 +1629,9 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft if err != nil { return nil, false, trace.Wrap(err) } + + // TODO(hugoShaka): Fix this. This code path has terrible performance + // and should be replaced by proper pagination. if !l.foundStart { key, err := getSubPageCheckpoint(&e) if err != nil { @@ -1673,13 +1713,13 @@ dateLoop: } if l.checkpoint.Iterator != "" { - m := make(map[string]any) - err = json.Unmarshal([]byte(l.checkpoint.Iterator), &m) + var e EventKey + err = json.Unmarshal([]byte(l.checkpoint.Iterator), &e) if err != nil { return nil, trace.Wrap(err) } - input.ExclusiveStartKey, err = attributevalue.MarshalMap(&m) + input.ExclusiveStartKey, err = attributevalue.MarshalMap(&e) if err != nil { return nil, trace.Wrap(err) } @@ -1756,12 +1796,12 @@ func (l *eventsFetcher) QueryBySessionIDIndex(ctx context.Context, sessionID str } if l.checkpoint.Iterator != "" { - m := make(map[string]string) - if err = json.Unmarshal([]byte(l.checkpoint.Iterator), &m); err != nil { + var e EventKey + if err = json.Unmarshal([]byte(l.checkpoint.Iterator), &e); err != nil { return nil, trace.Wrap(err) } - input.ExclusiveStartKey, err = attributevalue.MarshalMap(&m) + input.ExclusiveStartKey, err = attributevalue.MarshalMap(&e) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/events/dynamoevents/dynamoevents_test.go b/lib/events/dynamoevents/dynamoevents_test.go index 5dfd4cb6affdd..c70460e09094b 100644 --- a/lib/events/dynamoevents/dynamoevents_test.go +++ b/lib/events/dynamoevents/dynamoevents_test.go @@ -34,7 +34,6 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/google/go-cmp/cmp" "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" @@ -825,5 +824,91 @@ func TestStartKeyBackCompat(t *testing.T) { newCP, err := getCheckpointFromStartKey(newStartKey) require.NoError(t, err) - require.Empty(t, cmp.Diff(oldCP, newCP)) + // we must check the iterator field equality separately because it's a string + // containing a JSON-encoded event and field ordering might not be consistent. + require.Equal(t, oldCP.EventKey, newCP.EventKey) + require.Equal(t, oldCP.Date, newCP.Date) + + var oldIterator, newIterator event + require.NoError(t, json.Unmarshal([]byte(oldCP.Iterator), &oldIterator)) + require.NoError(t, json.Unmarshal([]byte(newCP.Iterator), &newIterator)) + require.Equal(t, oldIterator, newIterator) +} + +// TestCursorIteratorPrecision exists because cursors are sensitive to data-loss +// and we had a bug where we would unmarshall a cursor into `map[string]any`, +// causing all int64 to do a round-trip through float64 and losing precision. +// The precision loss would cause the cursor hash to be in te past. +// If the cursor shifts by more events than the page size, this creates a +// livelock and the query cannot proceed. +// This test creates events with very close EventIndex (1 nanosecond diff), +// reads the events 1 by one, and makes sure the reader is not stuck reading the +// same event over and over. +func TestCursorIteratorPrecision(t *testing.T) { + tt := setupDynamoContext(t) + clock, ok := tt.log.Clock.(*clockwork.FakeClock) + require.True(t, ok, "this test requires a FakeClock") + baseTime := clock.Now().UTC() + + // Test Setup: creating fixtures really close in the dynamo index. + + // For this test to work, we need the same session ID for all events + sessionId := uuid.NewString() + numEvents := 5 + testEvents := make(map[string]struct{}, numEvents) + + for range numEvents { + id := uuid.NewString() + // For the first event, EventIndex will be zero, for the next ones it + // will be the unix nanosecond timestamp. + clock.Advance(time.Nanosecond) + err := tt.log.EmitAuditEvent(context.Background(), &apievents.Exec{ + UserMetadata: apievents.UserMetadata{User: "test-user"}, + Metadata: apievents.Metadata{ + ID: id, + Type: events.UserLoginEvent, + Time: clock.Now().UTC(), + }, + SessionMetadata: apievents.SessionMetadata{ + SessionID: sessionId, + }, + }) + testEvents[id] = struct{}{} + require.NoError(t, err) + } + + // Test execution: do paginated queries to read all the fixtures. + eventsSeen := make(map[string]apievents.AuditEvent, numEvents) + toTime := baseTime.Add(time.Hour) + var arr []apievents.AuditEvent + var err error + var checkpoint string + + for range testEvents { + arr, checkpoint, err = tt.log.SearchEvents(t.Context(), events.SearchEventsRequest{ + From: baseTime, + To: toTime, + Limit: 1, + Order: types.EventOrderAscending, + StartKey: checkpoint, + }) + require.NoError(t, err) + require.Len(t, arr, 1) + + id := arr[0].GetID() + var c checkpointKey + require.NoError(t, json.Unmarshal([]byte(checkpoint), &c), "event %s", id) + require.NotEmpty(t, c.Iterator, "event %s", id) + + var e EventKey + require.NoError(t, json.Unmarshal([]byte(c.Iterator), &e), "event %s", id) + eventsSeen[id] = arr[0] + } + + // Test validation: make sure that all fixtures were read (as opposed to + // some event being returned several times because of a cursor issue). + for id := range testEvents { + require.Contains(t, eventsSeen, id, "eventsSeen should contain %q", id) + } + }