Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@
"pagerdutyapikey",
"pagerdutyapikeyfromsecret",
"pastable",
"parquetlog",
"pasteable",
"pgaadauth",
"pidof",
Expand Down
118 changes: 63 additions & 55 deletions docs/pages/includes/metrics.mdx

Large diffs are not rendered by default.

100 changes: 99 additions & 1 deletion lib/events/athena/athena.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

"github.com/gravitational/teleport"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/observability/metrics"
"github.com/gravitational/teleport/lib/utils"
)

Expand Down Expand Up @@ -361,6 +363,14 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
return nil, trace.Wrap(err)
}

// metricConsumerBatchProcessingDuration is defined after checking config, because
// its bucket depends on batchMaxInterval.
metricConsumerBatchProcessingDuration := metricConsumerBatchProcessingDuration(cfg.BatchMaxInterval)

if err := metrics.RegisterPrometheusCollectors(append(prometheusCollectors, metricConsumerBatchProcessingDuration)...); err != nil {
return nil, trace.Wrap(err)
}

querier, err := newQuerier(querierConfig{
tablename: cfg.TableName,
database: cfg.Database,
Expand All @@ -377,7 +387,7 @@ func New(ctx context.Context, cfg Config) (*Log, error) {

consumerCtx, consumerCancel := context.WithCancel(ctx)

consumer, err := newConsumer(cfg, consumerCancel)
consumer, err := newConsumer(cfg, consumerCancel, metricConsumerBatchProcessingDuration)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -425,3 +435,91 @@ func isValidUrlWithScheme(s string) (string, bool) {
}
return u.Scheme, true
}

func metricConsumerBatchProcessingDuration(batchInterval time.Duration) prometheus.Histogram {
batchSeconds := batchInterval.Seconds()
return prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerBatchPorcessingDuration,
Help: "Duration of processing single batch of events in parquetlog",
// For 60s batch interval it will look like:
// 6.00, 12.00, 30.00, 45.00, 54.00, 59.01, 64.48, 70.47, 77.01, 84.15, 91.96, 100.49, 109.81, 120.00
// We want some visibility if batch takes very small amount of time, but we are mostly interested
// in range from 0.9*batch to 2*batch.
Buckets: append([]float64{0.1 * batchSeconds, 0.2 * batchSeconds, 0.5 * batchSeconds, 0.75 * batchSeconds}, prometheus.ExponentialBucketsRange(0.9*batchSeconds, 2*batchSeconds, 10)...),
},
)
}

var (
consumerS3parquetFlushDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerS3FlushDuration,
Help: "Duration of flush and close of s3 parquet files in parquetlog",
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^15 == 32.768 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 16),
},
)

consumerDeleteMessageDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerDeleteEventsDuration,
Help: "Duration of delation of events on SQS in parquetlog",
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^15 == 32.768 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 16),
},
)

consumerBatchSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerBatchSize,
Help: "Size of single batch of events in parquetlog",
Buckets: prometheus.ExponentialBucketsRange(200, 100*1024*1024 /* 100 MB*/, 10),
},
)

consumerBatchCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerBatchCount,
Help: "Number of events in single batch in parquetlog",
},
)

consumerLastProcessedTimestamp = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerLastProcessedTimestamp,
Help: "Timestamp of last finished consumer execution",
},
)

consumerAgeOfOldestProcessedMessage = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerOldestProcessedMessage,
Help: "Age of oldest processed message in seconds",
},
)

consumerNumberOfErrorsFromSQSCollect = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: teleport.MetricNamespace,
Name: teleport.MetricParquetlogConsumerCollectFailed,
Help: "Number of errors received from sqs collect",
},
)

prometheusCollectors = []prometheus.Collector{
consumerS3parquetFlushDuration, consumerDeleteMessageDuration,
consumerBatchSize, consumerBatchCount,
consumerLastProcessedTimestamp, consumerAgeOfOldestProcessedMessage,
consumerNumberOfErrorsFromSQSCollect,
}
)
113 changes: 80 additions & 33 deletions lib/events/athena/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
sqsTypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/xitongsys/parquet-go-source/s3v2"
"github.com/xitongsys/parquet-go/parquet"
Expand Down Expand Up @@ -98,20 +99,21 @@ type s3downloader interface {
Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error)
}

func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) {
func newConsumer(cfg Config, cancelFn context.CancelFunc, metricConsumerBatchProcessingDuration prometheus.Histogram) (*consumer, error) {
s3client := s3.NewFromConfig(*cfg.AWSConfig)
sqsClient := sqs.NewFromConfig(*cfg.AWSConfig)

collectCfg := sqsCollectConfig{
sqsReceiver: sqsClient,
queueURL: cfg.QueueURL,
// TODO(tobiaszheller): use s3 manager from teleport observability.
payloadDownloader: manager.NewDownloader(s3client),
payloadBucket: cfg.largeEventsBucket,
visibilityTimeout: int32(cfg.BatchMaxInterval.Seconds()),
batchMaxItems: cfg.BatchMaxItems,
errHandlingFn: errHandlingFnFromSQS(cfg.LogEntry),
logger: cfg.LogEntry,
payloadDownloader: manager.NewDownloader(s3client),
payloadBucket: cfg.largeEventsBucket,
visibilityTimeout: int32(cfg.BatchMaxInterval.Seconds()),
batchMaxItems: cfg.BatchMaxItems,
errHandlingFn: errHandlingFnFromSQS(cfg.LogEntry),
logger: cfg.LogEntry,
metricConsumerBatchProcessingDuration: metricConsumerBatchProcessingDuration,
}
err := collectCfg.CheckAndSetDefaults()
if err != nil {
Expand Down Expand Up @@ -286,12 +288,9 @@ func runWithMinInterval(ctx context.Context, fn func(context.Context) bool, minI
func (c *consumer) processBatchOfEvents(ctx context.Context) (reachedMaxSize bool, e error) {
start := time.Now()
var size int
// TODO(tobiaszheller): we need some metrics to track it.
// And that log message should be deleted.
defer func() {
if size > 0 {
c.logger.Debugf("Batch of %d messages processed in %s", size, time.Since(start))
}
consumerLastProcessedTimestamp.SetToCurrentTime()
c.collectConfig.metricConsumerBatchProcessingDuration.Observe(time.Since(start).Seconds())
}()

msgsCollector := newSqsMessagesCollector(c.collectConfig)
Expand All @@ -304,6 +303,7 @@ func (c *consumer) processBatchOfEvents(ctx context.Context) (reachedMaxSize boo
go func() {
msgsCollector.fromSQS(readSQSCtx)
}()

toDelete, err := c.writeToS3(ctx, msgsCollector.getEventsChan(), c.perDateFileParquetWriter)
if err != nil {
return false, trace.Wrap(err)
Expand Down Expand Up @@ -339,6 +339,8 @@ type sqsCollectConfig struct {

logger log.FieldLogger
errHandlingFn func(ctx context.Context, errC chan error)

metricConsumerBatchProcessingDuration prometheus.Histogram
}

func (cfg *sqsCollectConfig) CheckAndSetDefaults() error {
Expand Down Expand Up @@ -383,6 +385,9 @@ func (cfg *sqsCollectConfig) CheckAndSetDefaults() error {
if cfg.errHandlingFn == nil {
return trace.BadParameter("errHandlingFn is not specified")
}
if cfg.metricConsumerBatchProcessingDuration == nil {
return trace.BadParameter("metricConsumerBatchProcessingDuration is not specified")
}
return nil
}

Expand Down Expand Up @@ -428,9 +433,9 @@ func (s *sqsMessagesCollector) fromSQS(ctx context.Context) {
defer cancel()

var (
count int
countMu sync.Mutex
wg sync.WaitGroup
fullBatchMetadata collectedEventsMetadata
fullBatchMetadataMu sync.Mutex
wg sync.WaitGroup
)

wg.Add(s.cfg.noOfWorkers)
Expand All @@ -448,27 +453,63 @@ func (s *sqsMessagesCollector) fromSQS(ctx context.Context) {
if deadline, ok := wokerCtx.Deadline(); ok && time.Until(deadline) <= s.cfg.waitOnReceiveDuration {
return
}
noOfReceived := s.receiveMessagesAndSendOnChan(wokerCtx, eventsC, errorsC)
if noOfReceived == 0 {
singleReceiveMetadata := s.receiveMessagesAndSendOnChan(wokerCtx, eventsC, errorsC)
if singleReceiveMetadata.Count == 0 {
// no point of locking and checking for size if nothing was returned.
continue
}
countMu.Lock()
count += noOfReceived
if count >= s.cfg.batchMaxItems {
countMu.Unlock()

fullBatchMetadataMu.Lock()
fullBatchMetadata.Merge(singleReceiveMetadata)
if fullBatchMetadata.Count >= s.cfg.batchMaxItems {
fullBatchMetadataMu.Unlock()
cancel()
return
}
countMu.Unlock()
fullBatchMetadataMu.Unlock()
}
}(i)
}
wg.Wait()
close(eventsC)
if fullBatchMetadata.Count > 0 {
consumerBatchCount.Add(float64(fullBatchMetadata.Count))
consumerBatchSize.Observe(float64(fullBatchMetadata.Size))
consumerAgeOfOldestProcessedMessage.Set(time.Since(fullBatchMetadata.OldestTimestamp).Seconds())
}
}

type collectedEventsMetadata struct {
// Size is total size of events.
Size int
// Count is number of events.
Count int
// OldestTimestamp is timestamp of oldest event.
OldestTimestamp time.Time
}

// Merge combines the metadata of two collectedEventsMetadata instances.
// It updates the current instance by adding the size and count of events from another instance,
// and sets the oldestTimestamp to the oldest timestamp between the two instances.
func (c *collectedEventsMetadata) Merge(in collectedEventsMetadata) {
c.Size += in.Size
c.Count += in.Count
if c.OldestTimestamp.IsZero() || (!in.OldestTimestamp.IsZero() && c.OldestTimestamp.After(in.OldestTimestamp)) {
c.OldestTimestamp = in.OldestTimestamp
}
}

func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context, eventsC chan<- eventAndAckID, errorsC chan<- error) (size int) {
// MergeWithEvent combines collectedEventsMetadata with metadata of single event.
func (c *collectedEventsMetadata) MergeWithEvent(in apievents.AuditEvent) {
c.Merge(collectedEventsMetadata{
// 1 because we are merging single event
Count: 1,
Size: in.Size(),
OldestTimestamp: in.GetTime(),
})
}

func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context, eventsC chan<- eventAndAckID, errorsC chan<- error) collectedEventsMetadata {
sqsOut, err := s.cfg.sqsReceiver.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(s.cfg.queueURL),
MaxNumberOfMessages: maxNumberOfMessagesFromReceive,
Expand All @@ -479,23 +520,23 @@ func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context,
if err != nil {
// We don't need handle canceled errors anyhow.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return 0
return collectedEventsMetadata{}
}
errorsC <- trace.Wrap(err)

// We don't want to retry receiving message immediately to prevent huge load
// on CPU if calls are contantly failing.
select {
case <-ctx.Done():
return 0
return collectedEventsMetadata{}
case <-time.After(s.cfg.waitOnReceiveError):
return 0
return collectedEventsMetadata{}
}
}
if len(sqsOut.Messages) == 0 {
return 0
return collectedEventsMetadata{}
}
var noOfValidMessages int
var singleReceiveMetadata collectedEventsMetadata
for _, msg := range sqsOut.Messages {
event, err := s.auditEventFromSQSorS3(ctx, msg)
if err != nil {
Expand All @@ -506,9 +547,9 @@ func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context,
event: event,
receiptHandle: aws.ToString(msg.ReceiptHandle),
}
noOfValidMessages++
singleReceiveMetadata.MergeWithEvent(event)
}
return noOfValidMessages
return singleReceiveMetadata
}

// auditEventFromSQSorS3 returns events either directly from SQS message payload
Expand Down Expand Up @@ -577,6 +618,7 @@ func errHandlingFnFromSQS(logger log.FieldLogger) func(ctx context.Context, errC
if errorsCount > maxErrorCountForLogsOnSQSReceive {
logger.Errorf("Got %d errors from SQS collector, printed only first %d", errorsCount, maxErrorCountForLogsOnSQSReceive)
}
consumerNumberOfErrorsFromSQSCollect.Add(float64(errorsCount))
}()

for {
Expand Down Expand Up @@ -652,7 +694,6 @@ eventLoop:
}
pqtEvent, err := auditEventToParquet(eventAndAckID.event)
if err != nil {
// TODO(tobiaszheller): come back and add some metrics here.
c.logger.WithError(err).Error("Could not convert event to parquet format")
continue
}
Expand Down Expand Up @@ -692,7 +733,10 @@ eventLoop:
toDelete = append(toDelete, eventAndAckID.receiptHandle)
}
}

eventLoopFinishedTime := time.Now()
defer func() {
consumerS3parquetFlushDuration.Observe(time.Since(eventLoopFinishedTime).Seconds())
}()
for _, pw := range perDateWriter {
if err := pw.Close(); err != nil {
// Typically there will be data just for one date.
Expand Down Expand Up @@ -742,7 +786,10 @@ func (c *consumer) deleteMessagesFromQueue(ctx context.Context, handles []string
if len(handles) == 0 {
return nil
}

start := time.Now()
defer func() {
consumerDeleteMessageDuration.Observe(time.Since(start).Seconds())
}()
const (
// maxDeleteBatchSize defines maximum number of handles passed to deleteMessage endpoint, limited by AWS.
maxDeleteBatchSize = 10
Expand Down
Loading