Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 62 additions & 22 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,45 @@ type Log struct {
svc *dynamodb.Client
}

// EventKey contains the subset of event fields used as a dynamo primary key,
// or used by a secondary index.
// This is used in checkpointKey to track the last processed event of a query
// and resume the query from there.
type EventKey struct {
// SessionID and EventIndex must always be set.
// If the event is not linked to a session, we generate a random UUID.
SessionID string
// EventIndex represent the relative order of an event in the sesssion.
// Its value can be 0 if the event does not belong to a session or if it is
// the first event of a session.
// Next session events increase this counter. In case of conflict
// (two events with the same SessionID and EventIndex), EventIndex is set to
// the Unix nanosecond timestamp. This seems to break the "EventIndex
// monotonically increases" property and might cause events to be
// backfilled/skipped by ongoing queries.
// This behavior was introduced in https://github.com/gravitational/teleport/pull/40854.
// Since then, EventIndex is a large int64 and might not survive a round-trip
// through float64. When its JSON representation is unmarshalled into
// `map[string]any`, the event will lose EventIndex precision this will
// cause data-loss. For critical usage, like DynamoDB ExclusiveStartKey,
// one must always marshall/unmarshall using the typed event struct.
// event.FieldsMap["ei"] suffers from this data loss, so its value and
// EventIndex might be different.
EventIndex int64
// CreatedAt is used to know if the event is outside of the requested time
// range (so we can discard the cursor completely in this case).
// This is also used as a secondary DynamodDB index.
CreatedAt int64 `json:",omitempty" dynamodbav:",omitempty"`
// CreatedAtDate is used to identify the event partition.
CreatedAtDate string `json:",omitempty" dynamodbav:",omitempty"`
}

type event struct {
SessionID string
EventIndex int64
EventKey
EventType string
CreatedAt int64
Expires *int64 `json:"Expires,omitempty" dynamodbav:",omitempty"`
FieldsMap events.EventFields
EventNamespace string
CreatedAtDate string
}

const (
Expand Down Expand Up @@ -647,13 +677,15 @@ func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamod
return nil, trace.Wrap(err)
}
e := event{
SessionID: sessionID,
EventIndex: in.GetIndex(),
EventKey: EventKey{
SessionID: sessionID,
EventIndex: in.GetIndex(),
CreatedAt: in.GetTime().Unix(),
CreatedAtDate: in.GetTime().Format(iso8601DateFormat),
},
EventType: in.GetType(),
EventNamespace: apidefaults.Namespace,
CreatedAt: in.GetTime().Unix(),
FieldsMap: fieldsMap,
CreatedAtDate: in.GetTime().Format(iso8601DateFormat),
}
l.setExpiry(&e)
av, err := attributevalue.MarshalMap(e)
Expand Down Expand Up @@ -710,6 +742,7 @@ type checkpointKey struct {

// EventKey is a derived identifier for an event used for resuming
// sub-page breaks due to size constraints.
// TODO(hugoShaka): Deprecate and remove this field.
EventKey string `json:"event_key,omitempty"`
}

Expand Down Expand Up @@ -840,7 +873,7 @@ func (l *Log) searchEventsRaw(ctx context.Context, fromUTC, toUTC time.Time, nam
l.logger.DebugContext(ctx, "search events", "from", fromUTC, "to", toUTC, "filter", filter, "limit", limit, "start_key", startKey, "order", order, "checkpoint", checkpoint)

if startKey != "" {
createdAt, err := GetCreatedAtFromStartKey(startKey)
createdAt, err := getCreatedAtFromCheckpoint(checkpoint)
if err == nil {
// we compare the cursor unix time to the from unix in order to drop the nanoseconds
// that are not present in the cursor.
Expand Down Expand Up @@ -962,10 +995,14 @@ func GetCreatedAtFromStartKey(startKey string) (time.Time, error) {
if err != nil {
return time.Time{}, trace.Wrap(err)
}
return getCreatedAtFromCheckpoint(checkpoint)
}

func getCreatedAtFromCheckpoint(checkpoint checkpointKey) (time.Time, error) {
if checkpoint.Iterator == "" {
return time.Time{}, errors.New("missing iterator")
}
var e event
var e EventKey
if err := json.Unmarshal([]byte(checkpoint.Iterator), &e); err != nil {
return time.Time{}, trace.Wrap(err)
}
Expand Down Expand Up @@ -1015,13 +1052,13 @@ func getCheckpointFromLegacyStartKey(startKey string) (checkpointKey, error) {
}

// decode the dynamo attrs into the go map repr common to the old and new formats.
m := make(map[string]any)
if err := attributevalue.UnmarshalMap(convertedAttrMap, &m); err != nil {
var e event
if err := attributevalue.UnmarshalMap(convertedAttrMap, &e); err != nil {
return checkpointKey{}, trace.Wrap(err)
}

// encode the map into json, making it equivalent to the new format.
iterator, err := json.Marshal(m)
iterator, err := json.Marshal(e)
if err != nil {
return checkpointKey{}, trace.Wrap(err)
}
Expand Down Expand Up @@ -1566,12 +1603,12 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
l.checkpoint.Iterator = ""

if output.LastEvaluatedKey != nil {
m := make(map[string]any)
if err := attributevalue.UnmarshalMap(output.LastEvaluatedKey, &m); err != nil {
var e EventKey
if err := attributevalue.UnmarshalMap(output.LastEvaluatedKey, &e); err != nil {
return nil, false, trace.Wrap(err)
}

iter, err := json.Marshal(&m)
iter, err := json.Marshal(&e)
if err != nil {
return nil, false, err
}
Expand All @@ -1592,6 +1629,9 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
if err != nil {
return nil, false, trace.Wrap(err)
}

// TODO(hugoShaka): Fix this. This code path has terrible performance
// and should be replaced by proper pagination.
if !l.foundStart {
key, err := getSubPageCheckpoint(&e)
if err != nil {
Expand Down Expand Up @@ -1673,13 +1713,13 @@ dateLoop:
}

if l.checkpoint.Iterator != "" {
m := make(map[string]any)
err = json.Unmarshal([]byte(l.checkpoint.Iterator), &m)
var e EventKey
err = json.Unmarshal([]byte(l.checkpoint.Iterator), &e)
if err != nil {
return nil, trace.Wrap(err)
}

input.ExclusiveStartKey, err = attributevalue.MarshalMap(&m)
input.ExclusiveStartKey, err = attributevalue.MarshalMap(&e)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -1756,12 +1796,12 @@ func (l *eventsFetcher) QueryBySessionIDIndex(ctx context.Context, sessionID str
}

if l.checkpoint.Iterator != "" {
m := make(map[string]string)
if err = json.Unmarshal([]byte(l.checkpoint.Iterator), &m); err != nil {
var e EventKey
if err = json.Unmarshal([]byte(l.checkpoint.Iterator), &e); err != nil {
return nil, trace.Wrap(err)
}

input.ExclusiveStartKey, err = attributevalue.MarshalMap(&m)
input.ExclusiveStartKey, err = attributevalue.MarshalMap(&e)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
89 changes: 87 additions & 2 deletions lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -825,5 +824,91 @@ func TestStartKeyBackCompat(t *testing.T) {
newCP, err := getCheckpointFromStartKey(newStartKey)
require.NoError(t, err)

require.Empty(t, cmp.Diff(oldCP, newCP))
// we must check the iterator field equality separately because it's a string
// containing a JSON-encoded event and field ordering might not be consistent.
require.Equal(t, oldCP.EventKey, newCP.EventKey)
require.Equal(t, oldCP.Date, newCP.Date)

var oldIterator, newIterator event
require.NoError(t, json.Unmarshal([]byte(oldCP.Iterator), &oldIterator))
require.NoError(t, json.Unmarshal([]byte(newCP.Iterator), &newIterator))
require.Equal(t, oldIterator, newIterator)
}

// TestCursorIteratorPrecision exists because cursors are sensitive to data-loss
// and we had a bug where we would unmarshall a cursor into `map[string]any`,
// causing all int64 to do a round-trip through float64 and losing precision.
// The precision loss would cause the cursor hash to be in te past.
// If the cursor shifts by more events than the page size, this creates a
// livelock and the query cannot proceed.
// This test creates events with very close EventIndex (1 nanosecond diff),
// reads the events 1 by one, and makes sure the reader is not stuck reading the
// same event over and over.
func TestCursorIteratorPrecision(t *testing.T) {
tt := setupDynamoContext(t)
clock, ok := tt.log.Clock.(*clockwork.FakeClock)
require.True(t, ok, "this test requires a FakeClock")
baseTime := clock.Now().UTC()

// Test Setup: creating fixtures really close in the dynamo index.

// For this test to work, we need the same session ID for all events
sessionId := uuid.NewString()
numEvents := 5
testEvents := make(map[string]struct{}, numEvents)

for range numEvents {
id := uuid.NewString()
// For the first event, EventIndex will be zero, for the next ones it
// will be the unix nanosecond timestamp.
clock.Advance(time.Nanosecond)
err := tt.log.EmitAuditEvent(context.Background(), &apievents.Exec{
UserMetadata: apievents.UserMetadata{User: "test-user"},
Metadata: apievents.Metadata{
ID: id,
Type: events.UserLoginEvent,
Time: clock.Now().UTC(),
},
SessionMetadata: apievents.SessionMetadata{
SessionID: sessionId,
},
})
testEvents[id] = struct{}{}
require.NoError(t, err)
}

// Test execution: do paginated queries to read all the fixtures.
eventsSeen := make(map[string]apievents.AuditEvent, numEvents)
toTime := baseTime.Add(time.Hour)
var arr []apievents.AuditEvent
var err error
var checkpoint string

for range testEvents {
arr, checkpoint, err = tt.log.SearchEvents(t.Context(), events.SearchEventsRequest{
From: baseTime,
To: toTime,
Limit: 1,
Order: types.EventOrderAscending,
StartKey: checkpoint,
})
require.NoError(t, err)
require.Len(t, arr, 1)

id := arr[0].GetID()
var c checkpointKey
require.NoError(t, json.Unmarshal([]byte(checkpoint), &c), "event %s", id)
require.NotEmpty(t, c.Iterator, "event %s", id)

var e EventKey
require.NoError(t, json.Unmarshal([]byte(c.Iterator), &e), "event %s", id)
eventsSeen[id] = arr[0]
}

// Test validation: make sure that all fixtures were read (as opposed to
// some event being returned several times because of a cursor issue).
for id := range testEvents {
require.Contains(t, eventsSeen, id, "eventsSeen should contain %q", id)
}

}
Loading