diff --git a/docs/cspell.json b/docs/cspell.json index 2792208120d12..49618c40edfb4 100644 --- a/docs/cspell.json +++ b/docs/cspell.json @@ -554,6 +554,7 @@ "pagerdutyapikey", "pagerdutyapikeyfromsecret", "pastable", + "parquetlog", "pasteable", "pgaadauth", "pidof", diff --git a/docs/pages/includes/metrics.mdx b/docs/pages/includes/metrics.mdx index 787717602ca6e..5d2a9caebdcbd 100644 --- a/docs/pages/includes/metrics.mdx +++ b/docs/pages/includes/metrics.mdx @@ -1,59 +1,67 @@ ## Auth Service and backends -| Name | Type | Component | Description | -| ----------------------------------------------- | ----------- | ------------------- | -------------------------------------------------------------------------------------------------- | -| `audit_failed_disk_monitoring` | counter | Teleport Audit Log | Number of times disk monitoring failed. | -| `audit_failed_emit_events` | counter | Teleport Audit Log | Number of times emitting audit events failed. | -| `audit_percentage_disk_space_used` | gauge | Teleport Audit Log | Percentage of disk space used. | -| `audit_server_open_files` | gauge | Teleport Audit Log | Number of open audit files. | -| `auth_generate_requests_throttled_total` | counter | Teleport Auth | Number of throttled requests to generate new server keys. | -| `auth_generate_requests_total` | counter | Teleport Auth | Number of requests to generate new server keys. | -| `auth_generate_requests` | gauge | Teleport Auth | Number of current generate requests. | -| `auth_generate_seconds` | histogram | Teleport Auth | Latency for generate requests. | -| `backend_batch_read_requests_total` | counter | cache | Number of read requests to the backend. | -| `backend_batch_read_seconds` | histogram | cache | Latency for batch read operations. | -| `backend_batch_write_requests_total` | counter | cache | Number of batch write requests to the backend. | -| `backend_batch_write_seconds` | histogram | cache | Latency for backend batch write operations. | -| `backend_read_requests_total` | counter | cache | Number of read requests to the backend. | -| `backend_read_seconds` | histogram | cache | Latency for read operations. | -| `backend_requests` | counter | cache | Number of write requests to the backend. | -| `backend_write_seconds` | histogram | cache | Latency for backend write operations. | -| `cluster_name_not_found_total` | counter | Teleport Auth | Number of times a cluster was not found. | -| `dynamo_requests_total` | counter | DynamoDB | Total number of requests to the DynamoDB API. | -| `dynamo_requests` | counter | DynamoDB | Total number of requests to the DynamoDB API grouped by result. | -| `dynamo_requests_seconds` | histogram | DynamoDB | Latency of DynamoDB API requests. | -| `etcd_backend_batch_read_requests` | counter | etcd | Number of read requests to the etcd database. | -| `etcd_backend_batch_read_seconds` | histogram | etcd | Latency for etcd read operations. | -| `etcd_backend_read_requests` | counter | etcd | Number of read requests to the etcd database. | -| `etcd_backend_read_seconds` | histogram | etcd | Latency for etcd read operations. | -| `etcd_backend_tx_requests` | counter | etcd | Number of transaction requests to the database. | -| `etcd_backend_tx_seconds` | histogram | etcd | Latency for etcd transaction operations. | -| `etcd_backend_write_requests` | counter | etcd | Number of write requests to the database. | -| `etcd_backend_write_seconds` | histogram | etcd | Latency for etcd write operations. | -| `firestore_events_backend_batch_read_requests` | counter | GCP Cloud Firestore | Number of batch read requests to Cloud Firestore events. | -| `firestore_events_backend_batch_read_seconds` | histogram | GCP Cloud Firestore | Latency for Cloud Firestore events batch read operations. | -| `firestore_events_backend_batch_write_requests` | counter | GCP Cloud Firestore | Number of batch write requests to Cloud Firestore events. | -| `firestore_events_backend_batch_write_seconds` | histogram | GCP Cloud Firestore | Latency for Cloud Firestore events batch write operations. | -| `firestore_events_backend_write_requests` | counter | GCP Cloud Firestore | Number of write requests to Cloud Firestore events. | -| `firestore_events_backend_write_seconds` | histogram | GCP Cloud Firestore | Latency for Cloud Firestore events write operations. | -| `gcs_event_storage_downloads_seconds` | histogram | GCP GCS | Latency for GCS download operations. | -| `gcs_event_storage_downloads` | counter | GCP GCS | Number of downloads from the GCS backend. | -| `gcs_event_storage_uploads_seconds` | histogram | GCP GCS | Latency for GCS upload operations. | -| `gcs_event_storage_uploads` | counter | GCP GCS | Number of uploads to the GCS backend. | -| `grpc_server_started_total` | counter | Teleport Auth | Total number of RPCs started on the server. | -| `grpc_server_handled_total` | counter | Teleport Auth | Total number of RPCs completed on the server, regardless of success or failure. | -| `grpc_server_msg_received_total` | counter | Teleport Auth | Total number of RPC stream messages received on the server. | -| `grpc_server_msg_sent_total` | counter | Teleport Auth | Total number of gRPC stream messages sent by the server. | -| `heartbeat_connections_received_total` | counter | Teleport Auth | Number of times the Auth Service received a heartbeat connection. | -| `s3_requests_total` | counter | Amazon S3 | Total number of requests to the S3 API. | -| `s3_requests` | counter | Amazon S3 | Total number of requests to the S3 API grouped by result. | -| `s3_requests_seconds` | histogram | Amazon S3 | Request latency for the S3 API. | -| `teleport_audit_emit_events` | counter | Teleport Audit Log | Number of audit events emitted. | -| `teleport_connected_resources` | gauge | Teleport Auth | Number and type of resources connected via keepalives. | -| `teleport_registered_servers` | gauge | Teleport Auth | The number of Teleport services that are connected to an Auth Service instance grouped by version. | -| `user_login_total` | counter | Teleport Auth | Number of user logins. | -| `teleport_migrations` | gauge | Teleport Auth | Tracks for each migration if it is active (1) or not (0). | -| `watcher_event_sizes` | histogram | cache | Overall size of events emitted. | -| `watcher_events` | histogram | cache | Per resource size of events emitted. | +| Name | Type | Component | Description | +|----------------------------------------------------------|-----------|---------------------|----------------------------------------------------------------------------------------------------| +| `audit_failed_disk_monitoring` | counter | Teleport Audit Log | Number of times disk monitoring failed. | +| `audit_failed_emit_events` | counter | Teleport Audit Log | Number of times emitting audit events failed. | +| `audit_percentage_disk_space_used` | gauge | Teleport Audit Log | Percentage of disk space used. | +| `audit_server_open_files` | gauge | Teleport Audit Log | Number of open audit files. | +| `auth_generate_requests_throttled_total` | counter | Teleport Auth | Number of throttled requests to generate new server keys. | +| `auth_generate_requests_total` | counter | Teleport Auth | Number of requests to generate new server keys. | +| `auth_generate_requests` | gauge | Teleport Auth | Number of current generate requests. | +| `auth_generate_seconds` | histogram | Teleport Auth | Latency for generate requests. | +| `backend_batch_read_requests_total` | counter | cache | Number of read requests to the backend. | +| `backend_batch_read_seconds` | histogram | cache | Latency for batch read operations. | +| `backend_batch_write_requests_total` | counter | cache | Number of batch write requests to the backend. | +| `backend_batch_write_seconds` | histogram | cache | Latency for backend batch write operations. | +| `backend_read_requests_total` | counter | cache | Number of read requests to the backend. | +| `backend_read_seconds` | histogram | cache | Latency for read operations. | +| `backend_requests` | counter | cache | Number of write requests to the backend. | +| `backend_write_seconds` | histogram | cache | Latency for backend write operations. | +| `cluster_name_not_found_total` | counter | Teleport Auth | Number of times a cluster was not found. | +| `dynamo_requests_total` | counter | DynamoDB | Total number of requests to the DynamoDB API. | +| `dynamo_requests` | counter | DynamoDB | Total number of requests to the DynamoDB API grouped by result. | +| `dynamo_requests_seconds` | histogram | DynamoDB | Latency of DynamoDB API requests. | +| `etcd_backend_batch_read_requests` | counter | etcd | Number of read requests to the etcd database. | +| `etcd_backend_batch_read_seconds` | histogram | etcd | Latency for etcd read operations. | +| `etcd_backend_read_requests` | counter | etcd | Number of read requests to the etcd database. | +| `etcd_backend_read_seconds` | histogram | etcd | Latency for etcd read operations. | +| `etcd_backend_tx_requests` | counter | etcd | Number of transaction requests to the database. | +| `etcd_backend_tx_seconds` | histogram | etcd | Latency for etcd transaction operations. | +| `etcd_backend_write_requests` | counter | etcd | Number of write requests to the database. | +| `etcd_backend_write_seconds` | histogram | etcd | Latency for etcd write operations. | +| `firestore_events_backend_batch_read_requests` | counter | GCP Cloud Firestore | Number of batch read requests to Cloud Firestore events. | +| `firestore_events_backend_batch_read_seconds` | histogram | GCP Cloud Firestore | Latency for Cloud Firestore events batch read operations. | +| `firestore_events_backend_batch_write_requests` | counter | GCP Cloud Firestore | Number of batch write requests to Cloud Firestore events. | +| `firestore_events_backend_batch_write_seconds` | histogram | GCP Cloud Firestore | Latency for Cloud Firestore events batch write operations. | +| `firestore_events_backend_write_requests` | counter | GCP Cloud Firestore | Number of write requests to Cloud Firestore events. | +| `firestore_events_backend_write_seconds` | histogram | GCP Cloud Firestore | Latency for Cloud Firestore events write operations. | +| `gcs_event_storage_downloads_seconds` | histogram | GCP GCS | Latency for GCS download operations. | +| `gcs_event_storage_downloads` | counter | GCP GCS | Number of downloads from the GCS backend. | +| `gcs_event_storage_uploads_seconds` | histogram | GCP GCS | Latency for GCS upload operations. | +| `gcs_event_storage_uploads` | counter | GCP GCS | Number of uploads to the GCS backend. | +| `grpc_server_started_total` | counter | Teleport Auth | Total number of RPCs started on the server. | +| `grpc_server_handled_total` | counter | Teleport Auth | Total number of RPCs completed on the server, regardless of success or failure. | +| `grpc_server_msg_received_total` | counter | Teleport Auth | Total number of RPC stream messages received on the server. | +| `grpc_server_msg_sent_total` | counter | Teleport Auth | Total number of gRPC stream messages sent by the server. | +| `heartbeat_connections_received_total` | counter | Teleport Auth | Number of times the Auth Service received a heartbeat connection. | +| `s3_requests_total` | counter | Amazon S3 | Total number of requests to the S3 API. | +| `s3_requests` | counter | Amazon S3 | Total number of requests to the S3 API grouped by result. | +| `s3_requests_seconds` | histogram | Amazon S3 | Request latency for the S3 API. | +| `teleport_audit_emit_events` | counter | Teleport Audit Log | Number of audit events emitted. | +| `teleport_audit_parquetlog_batch_processing_seconds` | histogram | Teleport Audit Log | Duration of processing single batch of events in the Parquet-format audit log. | +| `teleport_audit_parquetlog_s3_flush_seconds` | histogram | Teleport Audit Log | Duration of flushing parquet files to S3 in Parquet-format audit log. | +| `teleport_audit_parquetlog_delete_events_seconds` | histogram | Teleport Audit Log | Duration of deletion events from SQS in Parquet-format audit log. | +| `teleport_audit_parquetlog_batch_size` | histogram | Teleport Audit Log | Overall size of events in single batch in Parquet-format audit log. | +| `teleport_audit_parquetlog_batch_count` | counter | Teleport Audit Log | Total number of events in single batch in Parquet-format audit log. | +| `teleport_audit_parquetlog_last_processed_timestamp` | gauge | Teleport Audit Log | Number of last processing time in Parquet-format audit log. | +| `teleport_audit_parquetlog_age_oldest_processed_message` | gauge | Teleport Audit Log | Number of age of oldest event in Parquet-format audit log. | +| `teleport_audit_parquetlog_errors_from_collect_count` | counter | Teleport Audit Log | Number of collect failures in Parquet-format audit log. | +| `teleport_connected_resources` | gauge | Teleport Auth | Number and type of resources connected via keepalives. | +| `teleport_registered_servers` | gauge | Teleport Auth | The number of Teleport services that are connected to an Auth Service instance grouped by version. | +| `user_login_total` | counter | Teleport Auth | Number of user logins. | +| `teleport_migrations` | gauge | Teleport Auth | Tracks for each migration if it is active (1) or not (0). | +| `watcher_event_sizes` | histogram | cache | Overall size of events emitted. | +| `watcher_events` | histogram | cache | Per resource size of events emitted. | ## Enhanced Session Recording / BPF diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 3d2ef0389c53f..3543b5242ea41 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -27,12 +27,14 @@ import ( awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "github.com/gravitational/teleport" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/events" + "github.com/gravitational/teleport/lib/observability/metrics" "github.com/gravitational/teleport/lib/utils" ) @@ -361,6 +363,14 @@ func New(ctx context.Context, cfg Config) (*Log, error) { return nil, trace.Wrap(err) } + // metricConsumerBatchProcessingDuration is defined after checking config, because + // its bucket depends on batchMaxInterval. + metricConsumerBatchProcessingDuration := metricConsumerBatchProcessingDuration(cfg.BatchMaxInterval) + + if err := metrics.RegisterPrometheusCollectors(append(prometheusCollectors, metricConsumerBatchProcessingDuration)...); err != nil { + return nil, trace.Wrap(err) + } + querier, err := newQuerier(querierConfig{ tablename: cfg.TableName, database: cfg.Database, @@ -377,7 +387,7 @@ func New(ctx context.Context, cfg Config) (*Log, error) { consumerCtx, consumerCancel := context.WithCancel(ctx) - consumer, err := newConsumer(cfg, consumerCancel) + consumer, err := newConsumer(cfg, consumerCancel, metricConsumerBatchProcessingDuration) if err != nil { return nil, trace.Wrap(err) } @@ -425,3 +435,91 @@ func isValidUrlWithScheme(s string) (string, bool) { } return u.Scheme, true } + +func metricConsumerBatchProcessingDuration(batchInterval time.Duration) prometheus.Histogram { + batchSeconds := batchInterval.Seconds() + return prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricParquetlogConsumerBatchPorcessingDuration, + Help: "Duration of processing single batch of events in parquetlog", + // For 60s batch interval it will look like: + // 6.00, 12.00, 30.00, 45.00, 54.00, 59.01, 64.48, 70.47, 77.01, 84.15, 91.96, 100.49, 109.81, 120.00 + // We want some visibility if batch takes very small amount of time, but we are mostly interested + // in range from 0.9*batch to 2*batch. + Buckets: append([]float64{0.1 * batchSeconds, 0.2 * batchSeconds, 0.5 * batchSeconds, 0.75 * batchSeconds}, prometheus.ExponentialBucketsRange(0.9*batchSeconds, 2*batchSeconds, 10)...), + }, + ) +} + +var ( + consumerS3parquetFlushDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricParquetlogConsumerS3FlushDuration, + Help: "Duration of flush and close of s3 parquet files in parquetlog", + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^15 == 32.768 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), + }, + ) + + consumerDeleteMessageDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricParquetlogConsumerDeleteEventsDuration, + Help: "Duration of delation of events on SQS in parquetlog", + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^15 == 32.768 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), + }, + ) + + consumerBatchSize = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricParquetlogConsumerBatchSize, + Help: "Size of single batch of events in parquetlog", + Buckets: prometheus.ExponentialBucketsRange(200, 100*1024*1024 /* 100 MB*/, 10), + }, + ) + + consumerBatchCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricParquetlogConsumerBatchCount, + Help: "Number of events in single batch in parquetlog", + }, + ) + + consumerLastProcessedTimestamp = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricParquetlogConsumerLastProcessedTimestamp, + Help: "Timestamp of last finished consumer execution", + }, + ) + + consumerAgeOfOldestProcessedMessage = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricParquetlogConsumerOldestProcessedMessage, + Help: "Age of oldest processed message in seconds", + }, + ) + + consumerNumberOfErrorsFromSQSCollect = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricParquetlogConsumerCollectFailed, + Help: "Number of errors received from sqs collect", + }, + ) + + prometheusCollectors = []prometheus.Collector{ + consumerS3parquetFlushDuration, consumerDeleteMessageDuration, + consumerBatchSize, consumerBatchCount, + consumerLastProcessedTimestamp, consumerAgeOfOldestProcessedMessage, + consumerNumberOfErrorsFromSQSCollect, + } +) diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index bc83e21b7491b..ec09965d0005c 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -31,6 +31,7 @@ import ( sqsTypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/google/uuid" "github.com/gravitational/trace" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "github.com/xitongsys/parquet-go-source/s3v2" "github.com/xitongsys/parquet-go/parquet" @@ -98,7 +99,7 @@ type s3downloader interface { Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error) } -func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) { +func newConsumer(cfg Config, cancelFn context.CancelFunc, metricConsumerBatchProcessingDuration prometheus.Histogram) (*consumer, error) { s3client := s3.NewFromConfig(*cfg.AWSConfig) sqsClient := sqs.NewFromConfig(*cfg.AWSConfig) @@ -106,12 +107,13 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) { sqsReceiver: sqsClient, queueURL: cfg.QueueURL, // TODO(tobiaszheller): use s3 manager from teleport observability. - payloadDownloader: manager.NewDownloader(s3client), - payloadBucket: cfg.largeEventsBucket, - visibilityTimeout: int32(cfg.BatchMaxInterval.Seconds()), - batchMaxItems: cfg.BatchMaxItems, - errHandlingFn: errHandlingFnFromSQS(cfg.LogEntry), - logger: cfg.LogEntry, + payloadDownloader: manager.NewDownloader(s3client), + payloadBucket: cfg.largeEventsBucket, + visibilityTimeout: int32(cfg.BatchMaxInterval.Seconds()), + batchMaxItems: cfg.BatchMaxItems, + errHandlingFn: errHandlingFnFromSQS(cfg.LogEntry), + logger: cfg.LogEntry, + metricConsumerBatchProcessingDuration: metricConsumerBatchProcessingDuration, } err := collectCfg.CheckAndSetDefaults() if err != nil { @@ -286,12 +288,9 @@ func runWithMinInterval(ctx context.Context, fn func(context.Context) bool, minI func (c *consumer) processBatchOfEvents(ctx context.Context) (reachedMaxSize bool, e error) { start := time.Now() var size int - // TODO(tobiaszheller): we need some metrics to track it. - // And that log message should be deleted. defer func() { - if size > 0 { - c.logger.Debugf("Batch of %d messages processed in %s", size, time.Since(start)) - } + consumerLastProcessedTimestamp.SetToCurrentTime() + c.collectConfig.metricConsumerBatchProcessingDuration.Observe(time.Since(start).Seconds()) }() msgsCollector := newSqsMessagesCollector(c.collectConfig) @@ -304,6 +303,7 @@ func (c *consumer) processBatchOfEvents(ctx context.Context) (reachedMaxSize boo go func() { msgsCollector.fromSQS(readSQSCtx) }() + toDelete, err := c.writeToS3(ctx, msgsCollector.getEventsChan(), c.perDateFileParquetWriter) if err != nil { return false, trace.Wrap(err) @@ -339,6 +339,8 @@ type sqsCollectConfig struct { logger log.FieldLogger errHandlingFn func(ctx context.Context, errC chan error) + + metricConsumerBatchProcessingDuration prometheus.Histogram } func (cfg *sqsCollectConfig) CheckAndSetDefaults() error { @@ -383,6 +385,9 @@ func (cfg *sqsCollectConfig) CheckAndSetDefaults() error { if cfg.errHandlingFn == nil { return trace.BadParameter("errHandlingFn is not specified") } + if cfg.metricConsumerBatchProcessingDuration == nil { + return trace.BadParameter("metricConsumerBatchProcessingDuration is not specified") + } return nil } @@ -428,9 +433,9 @@ func (s *sqsMessagesCollector) fromSQS(ctx context.Context) { defer cancel() var ( - count int - countMu sync.Mutex - wg sync.WaitGroup + fullBatchMetadata collectedEventsMetadata + fullBatchMetadataMu sync.Mutex + wg sync.WaitGroup ) wg.Add(s.cfg.noOfWorkers) @@ -448,27 +453,63 @@ func (s *sqsMessagesCollector) fromSQS(ctx context.Context) { if deadline, ok := wokerCtx.Deadline(); ok && time.Until(deadline) <= s.cfg.waitOnReceiveDuration { return } - noOfReceived := s.receiveMessagesAndSendOnChan(wokerCtx, eventsC, errorsC) - if noOfReceived == 0 { + singleReceiveMetadata := s.receiveMessagesAndSendOnChan(wokerCtx, eventsC, errorsC) + if singleReceiveMetadata.Count == 0 { // no point of locking and checking for size if nothing was returned. continue } - countMu.Lock() - count += noOfReceived - if count >= s.cfg.batchMaxItems { - countMu.Unlock() + + fullBatchMetadataMu.Lock() + fullBatchMetadata.Merge(singleReceiveMetadata) + if fullBatchMetadata.Count >= s.cfg.batchMaxItems { + fullBatchMetadataMu.Unlock() cancel() return } - countMu.Unlock() + fullBatchMetadataMu.Unlock() } }(i) } wg.Wait() close(eventsC) + if fullBatchMetadata.Count > 0 { + consumerBatchCount.Add(float64(fullBatchMetadata.Count)) + consumerBatchSize.Observe(float64(fullBatchMetadata.Size)) + consumerAgeOfOldestProcessedMessage.Set(time.Since(fullBatchMetadata.OldestTimestamp).Seconds()) + } +} + +type collectedEventsMetadata struct { + // Size is total size of events. + Size int + // Count is number of events. + Count int + // OldestTimestamp is timestamp of oldest event. + OldestTimestamp time.Time +} + +// Merge combines the metadata of two collectedEventsMetadata instances. +// It updates the current instance by adding the size and count of events from another instance, +// and sets the oldestTimestamp to the oldest timestamp between the two instances. +func (c *collectedEventsMetadata) Merge(in collectedEventsMetadata) { + c.Size += in.Size + c.Count += in.Count + if c.OldestTimestamp.IsZero() || (!in.OldestTimestamp.IsZero() && c.OldestTimestamp.After(in.OldestTimestamp)) { + c.OldestTimestamp = in.OldestTimestamp + } } -func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context, eventsC chan<- eventAndAckID, errorsC chan<- error) (size int) { +// MergeWithEvent combines collectedEventsMetadata with metadata of single event. +func (c *collectedEventsMetadata) MergeWithEvent(in apievents.AuditEvent) { + c.Merge(collectedEventsMetadata{ + // 1 because we are merging single event + Count: 1, + Size: in.Size(), + OldestTimestamp: in.GetTime(), + }) +} + +func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context, eventsC chan<- eventAndAckID, errorsC chan<- error) collectedEventsMetadata { sqsOut, err := s.cfg.sqsReceiver.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ QueueUrl: aws.String(s.cfg.queueURL), MaxNumberOfMessages: maxNumberOfMessagesFromReceive, @@ -479,7 +520,7 @@ func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context, if err != nil { // We don't need handle canceled errors anyhow. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return 0 + return collectedEventsMetadata{} } errorsC <- trace.Wrap(err) @@ -487,15 +528,15 @@ func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context, // on CPU if calls are contantly failing. select { case <-ctx.Done(): - return 0 + return collectedEventsMetadata{} case <-time.After(s.cfg.waitOnReceiveError): - return 0 + return collectedEventsMetadata{} } } if len(sqsOut.Messages) == 0 { - return 0 + return collectedEventsMetadata{} } - var noOfValidMessages int + var singleReceiveMetadata collectedEventsMetadata for _, msg := range sqsOut.Messages { event, err := s.auditEventFromSQSorS3(ctx, msg) if err != nil { @@ -506,9 +547,9 @@ func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context, event: event, receiptHandle: aws.ToString(msg.ReceiptHandle), } - noOfValidMessages++ + singleReceiveMetadata.MergeWithEvent(event) } - return noOfValidMessages + return singleReceiveMetadata } // auditEventFromSQSorS3 returns events either directly from SQS message payload @@ -577,6 +618,7 @@ func errHandlingFnFromSQS(logger log.FieldLogger) func(ctx context.Context, errC if errorsCount > maxErrorCountForLogsOnSQSReceive { logger.Errorf("Got %d errors from SQS collector, printed only first %d", errorsCount, maxErrorCountForLogsOnSQSReceive) } + consumerNumberOfErrorsFromSQSCollect.Add(float64(errorsCount)) }() for { @@ -652,7 +694,6 @@ eventLoop: } pqtEvent, err := auditEventToParquet(eventAndAckID.event) if err != nil { - // TODO(tobiaszheller): come back and add some metrics here. c.logger.WithError(err).Error("Could not convert event to parquet format") continue } @@ -692,7 +733,10 @@ eventLoop: toDelete = append(toDelete, eventAndAckID.receiptHandle) } } - + eventLoopFinishedTime := time.Now() + defer func() { + consumerS3parquetFlushDuration.Observe(time.Since(eventLoopFinishedTime).Seconds()) + }() for _, pw := range perDateWriter { if err := pw.Close(); err != nil { // Typically there will be data just for one date. @@ -742,7 +786,10 @@ func (c *consumer) deleteMessagesFromQueue(ctx context.Context, handles []string if len(handles) == 0 { return nil } - + start := time.Now() + defer func() { + consumerDeleteMessageDuration.Observe(time.Since(start).Seconds()) + }() const ( // maxDeleteBatchSize defines maximum number of handles passed to deleteMessage endpoint, limited by AWS. maxDeleteBatchSize = 10 diff --git a/lib/events/athena/consumer_test.go b/lib/events/athena/consumer_test.go index e226981d7619f..514e04d718393 100644 --- a/lib/events/athena/consumer_test.go +++ b/lib/events/athena/consumer_test.go @@ -37,6 +37,7 @@ import ( "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/source" @@ -206,6 +207,7 @@ func validCollectCfgForTests(t *testing.T) sqsCollectConfig { t.Fail() } }, + metricConsumerBatchProcessingDuration: prometheus.NewHistogram(prometheus.HistogramOpts{Name: "for_tests"}), } } @@ -772,3 +774,75 @@ func (m *mockSQSDeleter) DeleteMessageBatch(ctx context.Context, params *sqs.Del m.noOfEntries += len(params.Entries) return m.respFn(ctx, params) } + +func TestCollectedEventsMetadataMerge(t *testing.T) { + now := time.Now() + tests := []struct { + name string + a collectedEventsMetadata + b collectedEventsMetadata + expected collectedEventsMetadata + }{ + { + name: "Merge with empty a", + a: collectedEventsMetadata{ + Size: 0, + Count: 0, + OldestTimestamp: time.Time{}, + }, + b: collectedEventsMetadata{ + Size: 10, + Count: 5, + OldestTimestamp: now, + }, + expected: collectedEventsMetadata{ + Size: 10, + Count: 5, + OldestTimestamp: now, + }, + }, + { + name: "Merge with empty b", + a: collectedEventsMetadata{ + Size: 10, + Count: 5, + OldestTimestamp: now, + }, + b: collectedEventsMetadata{ + Size: 0, + Count: 0, + OldestTimestamp: time.Time{}, + }, + expected: collectedEventsMetadata{ + Size: 10, + Count: 5, + OldestTimestamp: now, + }, + }, + { + name: "Merge with non-empty metadata", + a: collectedEventsMetadata{ + Size: 10, + Count: 5, + OldestTimestamp: now.Add(-time.Hour), + }, + b: collectedEventsMetadata{ + Size: 15, + Count: 7, + OldestTimestamp: now, + }, + expected: collectedEventsMetadata{ + Size: 25, + Count: 12, + OldestTimestamp: now.Add(-time.Hour), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.a.Merge(tt.b) + require.Empty(t, cmp.Diff(tt.a, tt.expected)) + }) + } +} diff --git a/metrics.go b/metrics.go index 4139562075f51..7574a066b672e 100644 --- a/metrics.go +++ b/metrics.go @@ -278,3 +278,26 @@ const ( // submission buffer reaching a length limit. MetricUsageEventsDropped = "usage_events_dropped_total" ) + +// athena audit log metrics +const ( + // MetricParquetlogConsumerBatchPorcessingDuration is a histogram of durations it + // took to process single batch of events. + MetricParquetlogConsumerBatchPorcessingDuration = "audit_parquetlog_batch_processing_seconds" + // MetricParquetlogConsumerS3FlushDuration is a histogram of durations it took to + // flush and close parquet files on s3. + MetricParquetlogConsumerS3FlushDuration = "audit_parquetlog_s3_flush_seconds" + // MetricParquetlogConsumerDeleteEventsDuration is a histogram of durations it + // took to delete events from SQS. + MetricParquetlogConsumerDeleteEventsDuration = "audit_parquetlog_delete_events_seconds" + // MetricParquetlogConsumerBatchSize is a histogram of sizes of single batch of events. + MetricParquetlogConsumerBatchSize = "audit_parquetlog_batch_size" + // MetricParquetlogConsumerBatchCount is a count of number of events in single batch. + MetricParquetlogConsumerBatchCount = "audit_parquetlog_batch_count" + // MetricParquetlogConsumerLastProcessedTimestamp is a timestamp of last finished consumer execution. + MetricParquetlogConsumerLastProcessedTimestamp = "audit_parquetlog_last_processed_timestamp" + // MetricParquetlogConsumerOldestProcessedMessage is age of oldest processed message. + MetricParquetlogConsumerOldestProcessedMessage = "audit_parquetlog_age_oldest_processed_message" + // MetricAthenaConsumerCollectFailed is a count of number of errors received from sqs collect. + MetricParquetlogConsumerCollectFailed = "audit_parquetlog_errors_from_collect_count" +)