diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index dbbf800bb9362..a1a84abfd1d56 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -551,3 +551,7 @@ var ( consumerNumberOfErrorsFromSQSCollect, } ) + +type trimmableEvent interface { + TrimToMaxSize(int) apievents.AuditEvent +} diff --git a/lib/events/athena/publisher.go b/lib/events/athena/publisher.go index 6518843c92462..23414334e2467 100644 --- a/lib/events/athena/publisher.go +++ b/lib/events/athena/publisher.go @@ -31,6 +31,7 @@ import ( "github.com/gravitational/trace" apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/internal/context121" ) @@ -42,8 +43,15 @@ const ( // maxSNSMessageSize defines maximum size of SNS message. AWS allows 256KB // however it counts also headers. We round it to 250KB, just to be sure. maxSNSMessageSize = 250 * 1024 - // maxS3BasedSize defines some resonable threshold for S3 based messages (2GB). - maxS3BasedSize uint64 = 2 * 1024 * 1024 * 1024 +) + +var ( + // maxS3BasedSize defines some resonable threshold for S3 based messages + // (almost 2GiB but fits in an int). + // + // It's a var instead of const so tests can override it instead of casually + // allocating 2GiB. + maxS3BasedSize = 2*1024*1024*1024 - 1 ) // publisher is a SNS based events publisher. @@ -112,6 +120,24 @@ func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) in.SetTime(time.Now().UTC().Round(time.Millisecond)) } + // Attempt to trim the event to maxS3BasedSize. This is a no-op if the event + // is already small enough. If it can not be trimmed or the event is still + // too large after marshaling then we may fail to emit the event below. + // + // This limit is much larger than events.MaxEventBytesInResponse and the + // event may need to be trimmed again on the querier side, but this is an + // attempt to preserve as much of the event as possible in case we add the + // ability to query very large events in the future. + if t, ok := in.(trimmableEvent); ok { + prevSize := in.Size() + // Trim to 3/4 the max size because base64 has 33% overhead. + // The TrimToMaxSize implementations have a 10% buffer already. + in = t.TrimToMaxSize(maxS3BasedSize - maxS3BasedSize/4) + if in.Size() != prevSize { + events.MetricStoredTrimmedEvents.Inc() + } + } + oneOf, err := apievents.ToOneOf(in) if err != nil { return trace.Wrap(err) @@ -123,7 +149,7 @@ func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) b64Encoded := base64.StdEncoding.EncodeToString(marshaledProto) if len(b64Encoded) > maxSNSMessageSize { - if uint64(len(b64Encoded)) > maxS3BasedSize { + if len(b64Encoded) > maxS3BasedSize { return trace.BadParameter("message too large to publish, size %d", len(b64Encoded)) } return trace.Wrap(p.emitViaS3(ctx, in.GetID(), marshaledProto)) diff --git a/lib/events/athena/publisher_test.go b/lib/events/athena/publisher_test.go index eb7529cd595e3..062f1f7415502 100644 --- a/lib/events/athena/publisher_test.go +++ b/lib/events/athena/publisher_test.go @@ -29,15 +29,23 @@ import ( apievents "github.com/gravitational/teleport/api/types/events" ) +func init() { + // Override maxS3BasedSize so we don't have to allocate 2GiB to test it. + // Do this in init to avoid any race. + maxS3BasedSize = maxSNSMessageSize * 4 +} + // TODO(tobiaszheller): Those UT just cover basic stuff. When we will have consumer // there will be UT which will cover whole flow of message with encoding/decoding. func Test_EmitAuditEvent(t *testing.T) { + veryLongString := strings.Repeat("t", maxS3BasedSize+1) tests := []struct { name string in apievents.AuditEvent publishErrors []error uploader s3uploader wantCheck func(t *testing.T, out []fakeQueueMessage) + wantErrorMsg string }{ { name: "valid publish", @@ -83,6 +91,33 @@ func Test_EmitAuditEvent(t *testing.T) { require.Contains(t, *out[0].attributes[payloadTypeAttr].StringValue, payloadTypeS3Based) }, }, + { + name: "very big untrimmable event", + in: &apievents.AppCreate{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Now().UTC(), + Code: veryLongString, + }, + }, + uploader: mockUploader{}, + wantErrorMsg: "message too large to publish", + }, + { + name: "very big trimmable event", + in: &apievents.DatabaseSessionQuery{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Now().UTC(), + }, + DatabaseQuery: veryLongString, + }, + uploader: mockUploader{}, + wantCheck: func(t *testing.T, out []fakeQueueMessage) { + require.Len(t, out, 1) + require.Contains(t, *out[0].attributes[payloadTypeAttr].StringValue, payloadTypeS3Based) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -94,6 +129,10 @@ func Test_EmitAuditEvent(t *testing.T) { }, } err := p.EmitAuditEvent(context.Background(), tt.in) + if tt.wantErrorMsg != "" { + require.ErrorContains(t, err, tt.wantErrorMsg) + return + } require.NoError(t, err) out := fq.dequeue() tt.wantCheck(t, out) diff --git a/lib/events/athena/querier.go b/lib/events/athena/querier.go index 1e276670abceb..3b9fc97ac472e 100644 --- a/lib/events/athena/querier.go +++ b/lib/events/athena/querier.go @@ -26,6 +26,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/athena" athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types" + "github.com/dustin/go-humanize" "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" @@ -588,7 +589,9 @@ func (q *querier) fetchResults(ctx context.Context, queryId string, limit int, c ), ) defer span.End() - rb := &responseBuilder{} + rb := &responseBuilder{ + logger: q.logger, + } // nextToken is used as offset to next calls for GetQueryResults. var nextToken string for { @@ -644,6 +647,8 @@ type responseBuilder struct { output []apievents.AuditEvent // totalSize is used to track size of output totalSize int + + logger log.FieldLogger } func (r *responseBuilder) endKeyset() (*keyset, error) { @@ -702,7 +707,51 @@ func (rb *responseBuilder) appendUntilSizeLimit(resultResp *athena.GetQueryResul } if len(eventData)+rb.totalSize > events.MaxEventBytesInResponse { - return true, nil + // Encountered an event that would push the total page over the size + // limit. + if len(rb.output) > 0 { + // There are already one or more full events to return, just + // return them and the next event will be picked up on the next + // page. + return true, nil + } + // A single event is larger than the max page size - the best we can + // do is try to trim it. + if t, ok := event.(trimmableEvent); ok { + event = t.TrimToMaxSize(events.MaxEventBytesInResponse) + events.MetricQueriedTrimmedEvents.Inc() + // Exact rb.totalSize doesn't really matter since the response is + // already size limited. + rb.totalSize += events.MaxEventBytesInResponse + rb.output = append(rb.output, event) + return true, nil + } + // Failed to trim the event to size. The only options are to return + // a response with 0 events, skip this event, or return an error. + // + // Silently skipping events is a terrible option, it's better for + // the client to get an error. + // + // Returning 0 events amounts to either skipping the event or + // getting the client stuck in a paging loop depending on what would + // be returned for the next page token. + // + // Returning a descriptive error should at least give the client a + // hint as to what has gone wrong so that an attempt can be made to + // fix it. + // + // If this condition is reached it should be considered a bug, any + // event that can possibly exceed the maximum size should implement + // TrimToMaxSize (until we can one day implement an API for storing + // and retrieving large events). + rb.logger.WithFields(log.Fields{ + "event_type": event.GetType(), + "event_id": event.GetID(), + "event_size": len(eventData), + }).Error("Failed to query event exceeding maximum response size.") + return true, trace.Errorf( + "%s event %s is %s and cannot be returned because it exceeds the maximum response size of %s", + event.GetType(), event.GetID(), humanize.IBytes(uint64(len(eventData))), humanize.IBytes(events.MaxEventBytesInResponse)) } rb.totalSize += len(eventData) rb.output = append(rb.output, event) diff --git a/lib/events/athena/querier_test.go b/lib/events/athena/querier_test.go index 92364070f25a5..8370344640a50 100644 --- a/lib/events/athena/querier_test.go +++ b/lib/events/athena/querier_test.go @@ -25,6 +25,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/athena" athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types" + "github.com/dustin/go-humanize" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" @@ -727,7 +728,7 @@ func Test_querier_fetchResults(t *testing.T) { AppName: "app-4", }, } - veryBigEvent := &apievents.AppCreate{ + bigUntrimmableEvent := &apievents.AppCreate{ Metadata: apievents.Metadata{ ID: uuid.NewString(), Time: time.Now().UTC(), @@ -737,6 +738,15 @@ func Test_querier_fetchResults(t *testing.T) { AppName: strings.Repeat("aaaaa", events.MaxEventBytesInResponse), }, } + bigTrimmableEvent := &apievents.DatabaseSessionQuery{ + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Time: time.Now().UTC(), + Type: events.DatabaseSessionQueryEvent, + }, + DatabaseQuery: strings.Repeat("aaaaa", events.MaxEventBytesInResponse), + } + bigTrimmedEvent := bigTrimmableEvent.TrimToMaxSize(events.MaxEventBytesInResponse) tests := []struct { name string limit int @@ -744,9 +754,10 @@ func Test_querier_fetchResults(t *testing.T) { // fakeResp defines responses which will be returned based on given // input token to GetQueryResults. Note that due to limit of GetQueryResults // we are doing multiple calls, first always with empty token. - fakeResp map[string]eventsWithToken - wantEvents []apievents.AuditEvent - wantKeyset string + fakeResp map[string]eventsWithToken + wantEvents []apievents.AuditEvent + wantKeyset string + wantErrorMsg string }{ { name: "no data returned from query, return empty results", @@ -763,25 +774,45 @@ func Test_querier_fetchResults(t *testing.T) { wantEvents: []apievents.AuditEvent{event1, event2, event3, event4}, }, { - name: "events with veryBigEvent exceeding > MaxEventBytesInResponse", + name: "events with untrimmable event exceeding > MaxEventBytesInResponse", fakeResp: map[string]eventsWithToken{ "": {returnToken: "token1", events: []apievents.AuditEvent{event1}}, - "token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, veryBigEvent}}, + "token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, bigUntrimmableEvent}}, }, limit: 10, - // we don't expect veryBigEvent because it should go to next batch + // we don't expect bigUntrimmableEvent because it should go to next batch wantEvents: []apievents.AuditEvent{event1, event2, event3}, wantKeyset: mustEventToKey(t, event3), }, { - // TODO(tobiaszheller): right now if we have event that's > 1 MiB, it will be silently ignored (due to gRPC unary limit). - // Come back later when we have decision what to do with it. - name: "only 1 very big event", + name: "only 1 very big untrimmable event", fakeResp: map[string]eventsWithToken{ - "": {returnToken: "", events: []apievents.AuditEvent{veryBigEvent}}, + "": {returnToken: "", events: []apievents.AuditEvent{bigUntrimmableEvent}}, + }, + limit: 10, + wantErrorMsg: fmt.Sprintf( + "app.create event %s is 5.0 MiB and cannot be returned because it exceeds the maximum response size of %s", + bigUntrimmableEvent.Metadata.ID, humanize.IBytes(events.MaxEventBytesInResponse)), + }, + { + name: "events with trimmable event exceeding > MaxEventBytesInResponse", + fakeResp: map[string]eventsWithToken{ + "": {returnToken: "token1", events: []apievents.AuditEvent{event1}}, + "token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, bigTrimmableEvent}}, + }, + limit: 10, + // we don't expect bigTrimmableEvent because it should go to next batch + wantEvents: []apievents.AuditEvent{event1, event2, event3}, + wantKeyset: mustEventToKey(t, event3), + }, + { + name: "only 1 very big trimmable event", + fakeResp: map[string]eventsWithToken{ + "": {returnToken: "", events: []apievents.AuditEvent{bigTrimmableEvent}}, }, limit: 10, - wantEvents: []apievents.AuditEvent{}, + wantEvents: []apievents.AuditEvent{bigTrimmedEvent}, + wantKeyset: mustEventToKey(t, bigTrimmableEvent), }, { name: "number of events equals limit in req, make sure that pagination keyset is returned", @@ -818,8 +849,14 @@ func Test_querier_fetchResults(t *testing.T) { }, } gotEvents, gotKeyset, err := q.fetchResults(context.Background(), "queryid", tt.limit, tt.condition) + if tt.wantErrorMsg != "" { + require.ErrorContains(t, err, tt.wantErrorMsg) + return + } require.NoError(t, err) - require.Empty(t, cmp.Diff(tt.wantEvents, gotEvents, cmpopts.EquateEmpty())) + require.Empty(t, cmp.Diff(tt.wantEvents, gotEvents, cmpopts.EquateEmpty(), + // Expect the database query to be trimmed + cmpopts.IgnoreFields(apievents.DatabaseSessionQuery{}, "DatabaseQuery"))) require.Equal(t, tt.wantKeyset, gotKeyset) }) } diff --git a/lib/events/auditlog.go b/lib/events/auditlog.go index bb97cadefa21d..7d97222606483 100644 --- a/lib/events/auditlog.go +++ b/lib/events/auditlog.go @@ -148,7 +148,33 @@ var ( }, ) - prometheusCollectors = []prometheus.Collector{auditOpenFiles, auditDiskUsed, auditFailedDisk, AuditFailedEmit, auditEmitEvent} + auditEmitEventSizes = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: teleport.MetricNamespace, + Name: "audit_emitted_event_sizes", + Help: "Size of single events emitted", + Buckets: prometheus.ExponentialBucketsRange(64, 2*1024*1024*1024 /*2GiB*/, 16), + }) + + // MetricStoredTrimmedEvents counts the number of events that were trimmed + // before being stored. + MetricStoredTrimmedEvents = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: "audit_stored_trimmed_events", + Help: "Number of events that were trimmed before being stored", + }) + + // MetricQueriedTrimmedEvents counts the number of events that were trimmed + // before being returned from a query. + MetricQueriedTrimmedEvents = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: "audit_queried_trimmed_events", + Help: "Number of events that were trimmed before being returned from a query", + }) + + prometheusCollectors = []prometheus.Collector{auditOpenFiles, auditDiskUsed, auditFailedDisk, AuditFailedEmit, auditEmitEvent, auditEmitEventSizes, MetricStoredTrimmedEvents, MetricQueriedTrimmedEvents} ) // AuditLog is a new combined facility to record Teleport events and diff --git a/lib/events/dynamoevents/dynamoevents.go b/lib/events/dynamoevents/dynamoevents.go index f2b11efe1cbab..a1aef7c040334 100644 --- a/lib/events/dynamoevents/dynamoevents.go +++ b/lib/events/dynamoevents/dynamoevents.go @@ -382,6 +382,7 @@ func (l *Log) handleAWSValidationError(ctx context.Context, err error, sessionID } fields := log.Fields{"event_id": in.GetID(), "event_type": in.GetType()} l.WithFields(fields).Info("Uploaded trimmed event to DynamoDB backend.") + events.MetricStoredTrimmedEvents.Inc() return nil } diff --git a/lib/events/emitter.go b/lib/events/emitter.go index dee7d582d2d62..5c27030bae8b4 100644 --- a/lib/events/emitter.go +++ b/lib/events/emitter.go @@ -164,6 +164,7 @@ func (w *CheckingEmitterConfig) CheckAndSetDefaults() error { func (r *CheckingEmitter) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error { ctx = context121.WithoutCancel(ctx) auditEmitEvent.Inc() + auditEmitEventSizes.Observe(float64(event.Size())) if err := checkAndSetEventFields(event, r.Clock, r.UIDGenerator, r.ClusterName); err != nil { log.WithError(err).Errorf("Failed to emit audit event.") AuditFailedEmit.Inc() diff --git a/lib/events/filelog.go b/lib/events/filelog.go index 3810a04e95d80..5033b4af66aac 100644 --- a/lib/events/filelog.go +++ b/lib/events/filelog.go @@ -164,6 +164,7 @@ func (l *FileLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent if err != nil { return trace.Wrap(err) } + MetricStoredTrimmedEvents.Inc() default: fields := log.Fields{"event_type": event.GetType(), "event_size": len(line)} l.WithFields(fields).Warnf("Got a event that exceeded max allowed size.")