From 663ae298a465eccd114f2e09ef6281b01d32ba31 Mon Sep 17 00:00:00 2001 From: Nic Klaassen Date: Thu, 7 Dec 2023 06:29:47 -0800 Subject: [PATCH 1/2] [v13] fix: trim large events in Athena querier Backport #35402 to branch/v13 Fixes #35161 Large events queried from the Athena audit backend will now be trimmed before they are stored and before they are returned from a query according to the existing TrimToMaxSize implementations for each event type already used by the Dynamo and File backends. The other backends typically trim the event before storing it, for Dynamo this is due to the 400 KB item size limit, for the file backend it's due to the 64 KiB bufio.MaxScanTokenSize. There is no hard limit to events stored in Parquet files in S3, but we've been using a 2 GiB limit in the publisher so far. With this change we will attempt to trim events to 2 GiB before writing them (if we haven't already run out of memory) instead of just failing. We've also been using a 1 MiB limit in the querier and just returning an empty result when an event larger than that is encountered. With this change we will attempt to trim the event to 1MiB before returning it. The 1 MiB limit ultimately stems from the 4MB max gRPC message size. We could just trim to 1 MiB in the publisher, but I'd prefer to preserve as much of the event data as possible in case we improve the querying story for large events in the future (and in case the user wants to query the events directly from S3). --- lib/events/athena/athena.go | 4 ++ lib/events/athena/publisher.go | 27 +++++++++++-- lib/events/athena/publisher_test.go | 39 ++++++++++++++++++ lib/events/athena/querier.go | 52 +++++++++++++++++++++++- lib/events/athena/querier_test.go | 63 +++++++++++++++++++++++------ 5 files changed, 167 insertions(+), 18 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index dbbf800bb9362..a1a84abfd1d56 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -551,3 +551,7 @@ var ( consumerNumberOfErrorsFromSQSCollect, } ) + +type trimmableEvent interface { + TrimToMaxSize(int) apievents.AuditEvent +} diff --git a/lib/events/athena/publisher.go b/lib/events/athena/publisher.go index 6518843c92462..f886caf6d3f47 100644 --- a/lib/events/athena/publisher.go +++ b/lib/events/athena/publisher.go @@ -42,8 +42,15 @@ const ( // maxSNSMessageSize defines maximum size of SNS message. AWS allows 256KB // however it counts also headers. We round it to 250KB, just to be sure. maxSNSMessageSize = 250 * 1024 - // maxS3BasedSize defines some resonable threshold for S3 based messages (2GB). - maxS3BasedSize uint64 = 2 * 1024 * 1024 * 1024 +) + +var ( + // maxS3BasedSize defines some resonable threshold for S3 based messages + // (almost 2GiB but fits in an int). + // + // It's a var instead of const so tests can override it instead of casually + // allocating 2GiB. + maxS3BasedSize = 2*1024*1024*1024 - 1 ) // publisher is a SNS based events publisher. @@ -112,6 +119,20 @@ func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) in.SetTime(time.Now().UTC().Round(time.Millisecond)) } + // Attempt to trim the event to maxS3BasedSize. This is a no-op if the event + // is already small enough. If it can not be trimmed or the event is still + // too large after marshaling then we may fail to emit the event below. + // + // This limit is much larger than events.MaxEventBytesInResponse and the + // event may need to be trimmed again on the querier side, but this is an + // attempt to preserve as much of the event as possible in case we add the + // ability to query very large events in the future. + if t, ok := in.(trimmableEvent); ok { + // Trim to 3/4 the max size because base64 has 33% overhead. + // The TrimToMaxSize implementations have a 10% buffer already. + in = t.TrimToMaxSize(maxS3BasedSize - maxS3BasedSize/4) + } + oneOf, err := apievents.ToOneOf(in) if err != nil { return trace.Wrap(err) @@ -123,7 +144,7 @@ func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) b64Encoded := base64.StdEncoding.EncodeToString(marshaledProto) if len(b64Encoded) > maxSNSMessageSize { - if uint64(len(b64Encoded)) > maxS3BasedSize { + if len(b64Encoded) > maxS3BasedSize { return trace.BadParameter("message too large to publish, size %d", len(b64Encoded)) } return trace.Wrap(p.emitViaS3(ctx, in.GetID(), marshaledProto)) diff --git a/lib/events/athena/publisher_test.go b/lib/events/athena/publisher_test.go index eb7529cd595e3..062f1f7415502 100644 --- a/lib/events/athena/publisher_test.go +++ b/lib/events/athena/publisher_test.go @@ -29,15 +29,23 @@ import ( apievents "github.com/gravitational/teleport/api/types/events" ) +func init() { + // Override maxS3BasedSize so we don't have to allocate 2GiB to test it. + // Do this in init to avoid any race. + maxS3BasedSize = maxSNSMessageSize * 4 +} + // TODO(tobiaszheller): Those UT just cover basic stuff. When we will have consumer // there will be UT which will cover whole flow of message with encoding/decoding. func Test_EmitAuditEvent(t *testing.T) { + veryLongString := strings.Repeat("t", maxS3BasedSize+1) tests := []struct { name string in apievents.AuditEvent publishErrors []error uploader s3uploader wantCheck func(t *testing.T, out []fakeQueueMessage) + wantErrorMsg string }{ { name: "valid publish", @@ -83,6 +91,33 @@ func Test_EmitAuditEvent(t *testing.T) { require.Contains(t, *out[0].attributes[payloadTypeAttr].StringValue, payloadTypeS3Based) }, }, + { + name: "very big untrimmable event", + in: &apievents.AppCreate{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Now().UTC(), + Code: veryLongString, + }, + }, + uploader: mockUploader{}, + wantErrorMsg: "message too large to publish", + }, + { + name: "very big trimmable event", + in: &apievents.DatabaseSessionQuery{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Now().UTC(), + }, + DatabaseQuery: veryLongString, + }, + uploader: mockUploader{}, + wantCheck: func(t *testing.T, out []fakeQueueMessage) { + require.Len(t, out, 1) + require.Contains(t, *out[0].attributes[payloadTypeAttr].StringValue, payloadTypeS3Based) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -94,6 +129,10 @@ func Test_EmitAuditEvent(t *testing.T) { }, } err := p.EmitAuditEvent(context.Background(), tt.in) + if tt.wantErrorMsg != "" { + require.ErrorContains(t, err, tt.wantErrorMsg) + return + } require.NoError(t, err) out := fq.dequeue() tt.wantCheck(t, out) diff --git a/lib/events/athena/querier.go b/lib/events/athena/querier.go index 1e276670abceb..7b1686204f73d 100644 --- a/lib/events/athena/querier.go +++ b/lib/events/athena/querier.go @@ -26,6 +26,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/athena" athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types" + "github.com/dustin/go-humanize" "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" @@ -588,7 +589,9 @@ func (q *querier) fetchResults(ctx context.Context, queryId string, limit int, c ), ) defer span.End() - rb := &responseBuilder{} + rb := &responseBuilder{ + logger: q.logger, + } // nextToken is used as offset to next calls for GetQueryResults. var nextToken string for { @@ -644,6 +647,8 @@ type responseBuilder struct { output []apievents.AuditEvent // totalSize is used to track size of output totalSize int + + logger log.FieldLogger } func (r *responseBuilder) endKeyset() (*keyset, error) { @@ -702,7 +707,50 @@ func (rb *responseBuilder) appendUntilSizeLimit(resultResp *athena.GetQueryResul } if len(eventData)+rb.totalSize > events.MaxEventBytesInResponse { - return true, nil + // Encountered an event that would push the total page over the size + // limit. + if len(rb.output) > 0 { + // There are already one or more full events to return, just + // return them and the next event will be picked up on the next + // page. + return true, nil + } + // A single event is larger than the max page size - the best we can + // do is try to trim it. + if t, ok := event.(trimmableEvent); ok { + event = t.TrimToMaxSize(events.MaxEventBytesInResponse) + // Exact rb.totalSize doesn't really matter since the response is + // already size limited. + rb.totalSize += events.MaxEventBytesInResponse + rb.output = append(rb.output, event) + return true, nil + } + // Failed to trim the event to size. The only options are to return + // a response with 0 events, skip this event, or return an error. + // + // Silently skipping events is a terrible option, it's better for + // the client to get an error. + // + // Returning 0 events amounts to either skipping the event or + // getting the client stuck in a paging loop depending on what would + // be returned for the next page token. + // + // Returning a descriptive error should at least give the client a + // hint as to what has gone wrong so that an attempt can be made to + // fix it. + // + // If this condition is reached it should be considered a bug, any + // event that can possibly exceed the maximum size should implement + // TrimToMaxSize (until we can one day implement an API for storing + // and retrieving large events). + rb.logger.WithFields(log.Fields{ + "event_type": event.GetType(), + "event_id": event.GetID(), + "event_size": len(eventData), + }).Error("Failed to query event exceeding maximum response size.") + return true, trace.Errorf( + "%s event %s is %s and cannot be returned because it exceeds the maximum response size of %s", + event.GetType(), event.GetID(), humanize.IBytes(uint64(len(eventData))), humanize.IBytes(events.MaxEventBytesInResponse)) } rb.totalSize += len(eventData) rb.output = append(rb.output, event) diff --git a/lib/events/athena/querier_test.go b/lib/events/athena/querier_test.go index 92364070f25a5..8370344640a50 100644 --- a/lib/events/athena/querier_test.go +++ b/lib/events/athena/querier_test.go @@ -25,6 +25,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/athena" athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types" + "github.com/dustin/go-humanize" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" @@ -727,7 +728,7 @@ func Test_querier_fetchResults(t *testing.T) { AppName: "app-4", }, } - veryBigEvent := &apievents.AppCreate{ + bigUntrimmableEvent := &apievents.AppCreate{ Metadata: apievents.Metadata{ ID: uuid.NewString(), Time: time.Now().UTC(), @@ -737,6 +738,15 @@ func Test_querier_fetchResults(t *testing.T) { AppName: strings.Repeat("aaaaa", events.MaxEventBytesInResponse), }, } + bigTrimmableEvent := &apievents.DatabaseSessionQuery{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Now().UTC(), + Type: events.DatabaseSessionQueryEvent, + }, + DatabaseQuery: strings.Repeat("aaaaa", events.MaxEventBytesInResponse), + } + bigTrimmedEvent := bigTrimmableEvent.TrimToMaxSize(events.MaxEventBytesInResponse) tests := []struct { name string limit int @@ -744,9 +754,10 @@ func Test_querier_fetchResults(t *testing.T) { // fakeResp defines responses which will be returned based on given // input token to GetQueryResults. Note that due to limit of GetQueryResults // we are doing multiple calls, first always with empty token. - fakeResp map[string]eventsWithToken - wantEvents []apievents.AuditEvent - wantKeyset string + fakeResp map[string]eventsWithToken + wantEvents []apievents.AuditEvent + wantKeyset string + wantErrorMsg string }{ { name: "no data returned from query, return empty results", @@ -763,25 +774,45 @@ func Test_querier_fetchResults(t *testing.T) { wantEvents: []apievents.AuditEvent{event1, event2, event3, event4}, }, { - name: "events with veryBigEvent exceeding > MaxEventBytesInResponse", + name: "events with untrimmable event exceeding > MaxEventBytesInResponse", fakeResp: map[string]eventsWithToken{ "": {returnToken: "token1", events: []apievents.AuditEvent{event1}}, - "token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, veryBigEvent}}, + "token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, bigUntrimmableEvent}}, }, limit: 10, - // we don't expect veryBigEvent because it should go to next batch + // we don't expect bigUntrimmableEvent because it should go to next batch wantEvents: []apievents.AuditEvent{event1, event2, event3}, wantKeyset: mustEventToKey(t, event3), }, { - // TODO(tobiaszheller): right now if we have event that's > 1 MiB, it will be silently ignored (due to gRPC unary limit). - // Come back later when we have decision what to do with it. - name: "only 1 very big event", + name: "only 1 very big untrimmable event", fakeResp: map[string]eventsWithToken{ - "": {returnToken: "", events: []apievents.AuditEvent{veryBigEvent}}, + "": {returnToken: "", events: []apievents.AuditEvent{bigUntrimmableEvent}}, + }, + limit: 10, + wantErrorMsg: fmt.Sprintf( + "app.create event %s is 5.0 MiB and cannot be returned because it exceeds the maximum response size of %s", + bigUntrimmableEvent.Metadata.ID, humanize.IBytes(events.MaxEventBytesInResponse)), + }, + { + name: "events with trimmable event exceeding > MaxEventBytesInResponse", + fakeResp: map[string]eventsWithToken{ + "": {returnToken: "token1", events: []apievents.AuditEvent{event1}}, + "token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, bigTrimmableEvent}}, + }, + limit: 10, + // we don't expect bigTrimmableEvent because it should go to next batch + wantEvents: []apievents.AuditEvent{event1, event2, event3}, + wantKeyset: mustEventToKey(t, event3), + }, + { + name: "only 1 very big trimmable event", + fakeResp: map[string]eventsWithToken{ + "": {returnToken: "", events: []apievents.AuditEvent{bigTrimmableEvent}}, }, limit: 10, - wantEvents: []apievents.AuditEvent{}, + wantEvents: []apievents.AuditEvent{bigTrimmedEvent}, + wantKeyset: mustEventToKey(t, bigTrimmableEvent), }, { name: "number of events equals limit in req, make sure that pagination keyset is returned", @@ -818,8 +849,14 @@ func Test_querier_fetchResults(t *testing.T) { }, } gotEvents, gotKeyset, err := q.fetchResults(context.Background(), "queryid", tt.limit, tt.condition) + if tt.wantErrorMsg != "" { + require.ErrorContains(t, err, tt.wantErrorMsg) + return + } require.NoError(t, err) - require.Empty(t, cmp.Diff(tt.wantEvents, gotEvents, cmpopts.EquateEmpty())) + require.Empty(t, cmp.Diff(tt.wantEvents, gotEvents, cmpopts.EquateEmpty(), + // Expect the database query to be trimmed + cmpopts.IgnoreFields(apievents.DatabaseSessionQuery{}, "DatabaseQuery"))) require.Equal(t, tt.wantKeyset, gotKeyset) }) } From 970cd448574d7a7d4f4a32a2675996001cbd6d32 Mon Sep 17 00:00:00 2001 From: rosstimothy <39066650+rosstimothy@users.noreply.github.com> Date: Thu, 7 Dec 2023 14:04:27 -0500 Subject: [PATCH 2/2] [v13] feat: add metrics for event sizes Backport #35440 to branch/v13 --- lib/events/athena/publisher.go | 5 +++++ lib/events/athena/querier.go | 1 + lib/events/auditlog.go | 28 ++++++++++++++++++++++++- lib/events/dynamoevents/dynamoevents.go | 1 + lib/events/emitter.go | 1 + lib/events/filelog.go | 1 + 6 files changed, 36 insertions(+), 1 deletion(-) diff --git a/lib/events/athena/publisher.go b/lib/events/athena/publisher.go index f886caf6d3f47..23414334e2467 100644 --- a/lib/events/athena/publisher.go +++ b/lib/events/athena/publisher.go @@ -31,6 +31,7 @@ import ( "github.com/gravitational/trace" apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/internal/context121" ) @@ -128,9 +129,13 @@ func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) // attempt to preserve as much of the event as possible in case we add the // ability to query very large events in the future. if t, ok := in.(trimmableEvent); ok { + prevSize := in.Size() // Trim to 3/4 the max size because base64 has 33% overhead. // The TrimToMaxSize implementations have a 10% buffer already. in = t.TrimToMaxSize(maxS3BasedSize - maxS3BasedSize/4) + if in.Size() != prevSize { + events.MetricStoredTrimmedEvents.Inc() + } } oneOf, err := apievents.ToOneOf(in) diff --git a/lib/events/athena/querier.go b/lib/events/athena/querier.go index 7b1686204f73d..3b9fc97ac472e 100644 --- a/lib/events/athena/querier.go +++ b/lib/events/athena/querier.go @@ -719,6 +719,7 @@ func (rb *responseBuilder) appendUntilSizeLimit(resultResp *athena.GetQueryResul // do is try to trim it. if t, ok := event.(trimmableEvent); ok { event = t.TrimToMaxSize(events.MaxEventBytesInResponse) + events.MetricQueriedTrimmedEvents.Inc() // Exact rb.totalSize doesn't really matter since the response is // already size limited. rb.totalSize += events.MaxEventBytesInResponse diff --git a/lib/events/auditlog.go b/lib/events/auditlog.go index bb97cadefa21d..7d97222606483 100644 --- a/lib/events/auditlog.go +++ b/lib/events/auditlog.go @@ -148,7 +148,33 @@ var ( }, ) - prometheusCollectors = []prometheus.Collector{auditOpenFiles, auditDiskUsed, auditFailedDisk, AuditFailedEmit, auditEmitEvent} + auditEmitEventSizes = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: teleport.MetricNamespace, + Name: "audit_emitted_event_sizes", + Help: "Size of single events emitted", + Buckets: prometheus.ExponentialBucketsRange(64, 2*1024*1024*1024 /*2GiB*/, 16), + }) + + // MetricStoredTrimmedEvents counts the number of events that were trimmed + // before being stored. + MetricStoredTrimmedEvents = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: "audit_stored_trimmed_events", + Help: "Number of events that were trimmed before being stored", + }) + + // MetricQueriedTrimmedEvents counts the number of events that were trimmed + // before being returned from a query. + MetricQueriedTrimmedEvents = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: "audit_queried_trimmed_events", + Help: "Number of events that were trimmed before being returned from a query", + }) + + prometheusCollectors = []prometheus.Collector{auditOpenFiles, auditDiskUsed, auditFailedDisk, AuditFailedEmit, auditEmitEvent, auditEmitEventSizes, MetricStoredTrimmedEvents, MetricQueriedTrimmedEvents} ) // AuditLog is a new combined facility to record Teleport events and diff --git a/lib/events/dynamoevents/dynamoevents.go b/lib/events/dynamoevents/dynamoevents.go index f2b11efe1cbab..a1aef7c040334 100644 --- a/lib/events/dynamoevents/dynamoevents.go +++ b/lib/events/dynamoevents/dynamoevents.go @@ -382,6 +382,7 @@ func (l *Log) handleAWSValidationError(ctx context.Context, err error, sessionID } fields := log.Fields{"event_id": in.GetID(), "event_type": in.GetType()} l.WithFields(fields).Info("Uploaded trimmed event to DynamoDB backend.") + events.MetricStoredTrimmedEvents.Inc() return nil } diff --git a/lib/events/emitter.go b/lib/events/emitter.go index dee7d582d2d62..5c27030bae8b4 100644 --- a/lib/events/emitter.go +++ b/lib/events/emitter.go @@ -164,6 +164,7 @@ func (w *CheckingEmitterConfig) CheckAndSetDefaults() error { func (r *CheckingEmitter) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error { ctx = context121.WithoutCancel(ctx) auditEmitEvent.Inc() + auditEmitEventSizes.Observe(float64(event.Size())) if err := checkAndSetEventFields(event, r.Clock, r.UIDGenerator, r.ClusterName); err != nil { log.WithError(err).Errorf("Failed to emit audit event.") AuditFailedEmit.Inc() diff --git a/lib/events/filelog.go b/lib/events/filelog.go index 3810a04e95d80..5033b4af66aac 100644 --- a/lib/events/filelog.go +++ b/lib/events/filelog.go @@ -164,6 +164,7 @@ func (l *FileLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent if err != nil { return trace.Wrap(err) } + MetricStoredTrimmedEvents.Inc() default: fields := log.Fields{"event_type": event.GetType(), "event_size": len(line)} l.WithFields(fields).Warnf("Got a event that exceeded max allowed size.")