From c025653c6559f439f65a9b74c7d18319878fff12 Mon Sep 17 00:00:00 2001 From: Tobiasz Heller Date: Tue, 28 Mar 2023 12:33:26 +0200 Subject: [PATCH 1/9] athena audit logs - config --- constants.go | 11 +- lib/events/athena/athena.go | 355 +++++++++++++++++++++++++++++++ lib/events/athena/athena_test.go | 218 +++++++++++++++++++ lib/service/service.go | 15 ++ 4 files changed, 595 insertions(+), 4 deletions(-) create mode 100644 lib/events/athena/athena.go create mode 100644 lib/events/athena/athena_test.go diff --git a/constants.go b/constants.go index 86640f05a9c9f..a4b8659c5d792 100644 --- a/constants.go +++ b/constants.go @@ -261,6 +261,9 @@ const ( // ComponentUsageReporting is the component responsible for reporting usage metrics. ComponentUsageReporting = "usage-reporting" + // ComponentAthena represents athena clients. + ComponentAthena = "athena" + // VerboseLogEnvVar forces all logs to be verbose (down to DEBUG level) VerboseLogsEnvVar = "TELEPORT_DEBUG" @@ -311,11 +314,11 @@ const ( // DirMaskSharedGroup is the mask for a directory accessible // by the owner and group - DirMaskSharedGroup = 0770 + DirMaskSharedGroup = 0o770 // FileMaskOwnerOnly is the file mask that allows read write access // to owers only - FileMaskOwnerOnly = 0600 + FileMaskOwnerOnly = 0o600 // On means mode is on On = "on" @@ -619,10 +622,10 @@ const ( const ( // SharedDirMode is a mode for a directory shared with group - SharedDirMode = 0750 + SharedDirMode = 0o750 // PrivateDirMode is a mode for private directories - PrivateDirMode = 0700 + PrivateDirMode = 0o700 ) const ( diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go new file mode 100644 index 0000000000000..91563a50dcc1f --- /dev/null +++ b/lib/events/athena/athena.go @@ -0,0 +1,355 @@ +package athena + +import ( + "context" + "math" + "net/url" + "regexp" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + awssession "github.com/aws/aws-sdk-go/aws/session" + "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" + log "github.com/sirupsen/logrus" + + "github.com/gravitational/teleport" + "github.com/gravitational/teleport/api/types" + apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/events" + "github.com/gravitational/teleport/lib/session" + "github.com/gravitational/teleport/lib/utils" +) + +const ( + // TODO(tobiaszheller): move to batcher.go in other PR. + // maxWaitTimeOnReceiveMessageFromSQSInSeconds defines how long single + // receiveFromQueue will wait if there is no max events (10). + maxWaitTimeOnReceiveMessageFromSQSInSeconds = 5 +) + +// Config structure represents Athena configuration. +// Right now the only way to set config is via url params. +type Config struct { + // Region is where Athena, SQS and SNS lives (required). + Region string + + // Publisher settings. + + // TopicARN where to emit events in SNS (required). + TopicARN string + // LargeEventsS3 is location on S3 where temporary large events (>256KB) + // are stored before converting it to Parquet and moving to long term + // storage (required). + LargeEventsS3 string + + // Query settings. + + // Database is name of Glue Database that Athena will query against (required). + Database string + // TableName is name of Glue Table that Athena will query against (required). + TableName string + // LocationS3 is location on S3 where Parquet files partitioned by date are + // stored (required). + LocationS3 string + // QueryResultsS3 is location on S3 where Athena stored query results (optional). + // Default results path can be defined by in workgroup settings. + QueryResultsS3 string + // Workgroup is Glue workgroup where Athena queries are executed (optional). + Workgroup string + // GetQueryResultsSleepTime is used to define how long query will wait before + // checking again for results status if previous status was not ready (optional). + GetQueryResultsSleepTime time.Duration + // LimiterRate defines rate at which search_event rate limiter is filled (optional). + LimiterRate float64 + // LimiterBurst defines rate limit bucket capacity (optional). + LimiterBurst int + + // Batcher settings. + + // QueueUrl is URL of SQS, which is set as subscriber to SNS topic (required). + QueueUrl string + // BatchMaxItems defines how many items can be stored in single Parquet + // batch (optional). + // It's soft limit. + BatchMaxItems int + // BatchMaxInterval defined interval at which parquet files will be created (optional). + BatchMaxInterval time.Duration + + // Clock is a clock interface, used in tests. + Clock clockwork.Clock + // UIDGenerator is unique ID generator. + UIDGenerator utils.UID + + // TODO(tobiaszheller): add FIPS config in later phase. +} + +// CheckAndSetDefaults is a helper returns an error if the supplied configuration +// is not enough to connect to SNS +func (cfg *Config) CheckAndSetDefaults() error { + const glueNameMaxLen = 255 + if cfg.Database == "" { + return trace.BadParameter("Database is not specified") + } + if len(cfg.Database) > glueNameMaxLen { + return trace.BadParameter("Database name too long") + } + if !isAlphanumericOrUnderscore(cfg.Database) { + return trace.BadParameter("Database name can contains only alphanumeric or underscore characters") + } + + if cfg.TableName == "" { + return trace.BadParameter("TableName is not specified") + } + if len(cfg.TableName) > glueNameMaxLen { + return trace.BadParameter("TableName too long") + } + // TableName is appended directly to athena query. That's why we put extra care + // that no weird chars are passed here. + if !isAlphanumericOrUnderscore(cfg.TableName) { + return trace.BadParameter("TableName can contains only alphanumeric or underscore characters") + } + + if cfg.TopicARN == "" { + return trace.BadParameter("TopicARN is not specified") + } + + if cfg.LocationS3 == "" { + return trace.BadParameter("LocationS3 is not specified") + } + if scheme, ok := isValidUrlWithScheme(cfg.LocationS3); !ok || scheme != "s3" { + return trace.BadParameter("LocationS3 must be valid url and start with s3") + } + + if cfg.LargeEventsS3 == "" { + return trace.BadParameter("LargeEventsS3 is not specified") + } + if scheme, ok := isValidUrlWithScheme(cfg.LargeEventsS3); !ok || scheme != "s3" { + return trace.BadParameter("LargeEventsS3 must be valid url and start with s3") + } + + if cfg.QueueUrl == "" { + return trace.BadParameter("QueueUrl is not specified") + } + if scheme, ok := isValidUrlWithScheme(cfg.QueueUrl); !ok || scheme != "https" { + return trace.BadParameter("QueueUrl must be valid url and start with https") + } + + if cfg.GetQueryResultsSleepTime == 0 { + cfg.GetQueryResultsSleepTime = 100 * time.Millisecond + } + + if cfg.BatchMaxItems == 0 { + // 20000 items, per average 500KB event size = 10MB + cfg.BatchMaxItems = 20000 + } + + if cfg.BatchMaxInterval == 0 { + cfg.BatchMaxInterval = 1 * time.Minute + } + + if cfg.BatchMaxInterval < maxWaitTimeOnReceiveMessageFromSQSInSeconds*time.Second { + // If BatchMaxInterval is shorter it will mean we will cancel all + // requests when there is less messages than 10 on queue. + // This can be fixed by shortening timeout on read, but realisticly + // no-one should use that short interval, so it's easier to check here. + // For high load operation, BatchMaxItems will happen first. + return trace.BadParameter("BatchMaxInterval too short, must be greater than 5s") + } + + if cfg.LimiterRate < 0 { + return trace.BadParameter("LimiterRate cannot be nagative") + } + if cfg.LimiterBurst < 0 { + return trace.BadParameter("LimiterBurst cannot be nagative") + } + + if cfg.LimiterRate > 0 && cfg.LimiterBurst == 0 { + return trace.BadParameter("LimiterBurst must be greater than 0 if LimiterRate is used") + } + + if cfg.LimiterBurst > 0 && math.Abs(cfg.LimiterRate) < 1e-9 { + return trace.BadParameter("LimiterRate must be greater than 0 if LimiterBurst is used") + } + + if cfg.Clock == nil { + cfg.Clock = clockwork.NewRealClock() + } + if cfg.UIDGenerator == nil { + cfg.UIDGenerator = utils.NewRealUID() + } + + return nil +} + +// SetFromURL establishes values on an EventsConfig from the supplied URI +func (cfg *Config) SetFromURL(url *url.URL) error { + splitted := strings.Split(url.Host, ".") + if len(splitted) != 2 { + return trace.BadParameter("invalid athena address, supported format is 'athena://database.table', got %q", url.Host) + } + cfg.Database, cfg.TableName = splitted[0], splitted[1] + + topicARN := url.Query().Get("topicArn") + if topicARN != "" { + cfg.TopicARN = topicARN + } + largeEventsS3 := url.Query().Get("largeEventsS3") + if largeEventsS3 != "" { + cfg.LargeEventsS3 = largeEventsS3 + } + + locationS3 := url.Query().Get("locationS3") + if locationS3 != "" { + cfg.LocationS3 = locationS3 + } + queryResultsS3 := url.Query().Get("queryResultsS3") + if queryResultsS3 != "" { + cfg.QueryResultsS3 = queryResultsS3 + } + workgroup := url.Query().Get("workgroup") + if workgroup != "" { + cfg.Workgroup = workgroup + } + getQueryResultsSleepTime := url.Query().Get("getQueryResultsSleepTime") + if getQueryResultsSleepTime != "" { + dur, err := time.ParseDuration(getQueryResultsSleepTime) + if err != nil { + return trace.BadParameter("invalid getQueryResultsSleepTime value: %v", err) + } + cfg.GetQueryResultsSleepTime = dur + } + rateInString := url.Query().Get("limiterRate") + if rateInString != "" { + rate, err := strconv.ParseFloat(rateInString, 32) + if err != nil { + return trace.BadParameter("invalid limiterRate value (it must be float32): %v", err) + } + cfg.LimiterRate = rate + } + burstInString := url.Query().Get("limiterBurst") + if burstInString != "" { + burst, err := strconv.Atoi(burstInString) + if err != nil { + return trace.BadParameter("invalid limiterBurst value (it must be int): %v", err) + } + cfg.LimiterBurst = burst + } + + queueURL := url.Query().Get("queueURL") + if queueURL != "" { + cfg.QueueUrl = queueURL + } + batchMaxItems := url.Query().Get("batchMaxItems") + if batchMaxItems != "" { + intMaxItems, err := strconv.Atoi(batchMaxItems) + if err != nil { + return trace.BadParameter("invalid batchMaxItems value (it must be int): %v", err) + } + cfg.BatchMaxItems = intMaxItems + } + batchMaxInterval := url.Query().Get("batchMaxInterval") + if batchMaxInterval != "" { + dur, err := time.ParseDuration(batchMaxInterval) + if err != nil { + return trace.BadParameter("invalid batchMaxInterval value: %v", err) + } + cfg.BatchMaxInterval = dur + } + + return nil +} + +// Log is a aws storage of events. +type Log struct { + // Entry is a log entry + *log.Entry + // Config is a backend configuration + Config + + // session holds the AWS client. + session *awssession.Session +} + +// New returns new instance of athena based audit logger. +func New(ctx context.Context, cfg Config) (*Log, error) { + err := cfg.CheckAndSetDefaults() + if err != nil { + return nil, trace.Wrap(err) + } + logEntry := log.WithFields(log.Fields{ + trace.Component: teleport.ComponentAthena, + }) + l := &Log{ + Entry: logEntry, + Config: cfg, + } + // Create an AWS session using default SDK behavior, i.e. it will interpret + // the environment and ~/.aws directory just like an AWS CLI tool would. + l.session, err = awssession.NewSessionWithOptions(awssession.Options{ + SharedConfigState: awssession.SharedConfigEnable, + }) + if err != nil { + return nil, trace.Wrap(err) + } + // override the default environment (region + credentials) with the values + // from the config. + if cfg.Region != "" { + l.session.Config.Region = aws.String(cfg.Region) + } + + // TODO(tobiaszheller): initialize publisher + // TODO(tobiaszheller): initialize batcher + // TODO(tobiaszheller): initialize querier + + return l, nil +} + +func (l *Log) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) error { + return trace.NotImplemented("not implemented") +} + +func (l *Log) GetSessionChunk(namespace string, sid session.ID, offsetBytes, maxBytes int) ([]byte, error) { + return nil, trace.NotImplemented("not implemented") +} + +func (l *Log) GetSessionEvents(namespace string, sid session.ID, after int, includePrintEvents bool) ([]events.EventFields, error) { + return nil, trace.NotImplemented("not implemented") +} + +func (l *Log) SearchEvents(fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]apievents.AuditEvent, string, error) { + return nil, "", trace.NotImplemented("not implemented") +} + +func (l *Log) SearchSessionEvents(fromUTC, toUTC time.Time, limit int, order types.EventOrder, startKey string, cond *types.WhereExpr, sessionID string) ([]apievents.AuditEvent, string, error) { + return nil, "", trace.NotImplemented("not implemented") +} + +func (l *Log) Close() error { + return nil +} + +func (l *Log) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) { + c, e := make(chan apievents.AuditEvent), make(chan error, 1) + e <- trace.NotImplemented("not implemented") + return c, e +} + +func isAlphanumericOrUnderscore(s string) bool { + pattern := "^[a-zA-Z0-9_]+$" + re := regexp.MustCompile(pattern) + return re.MatchString(s) +} + +func isValidUrlWithScheme(s string) (string, bool) { + u, err := url.Parse(s) + if err != nil { + return "", false + } + if u.Scheme == "" { + return "", false + } + return u.Scheme, true +} diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go new file mode 100644 index 0000000000000..2f0b44ae3d2f2 --- /dev/null +++ b/lib/events/athena/athena_test.go @@ -0,0 +1,218 @@ +package athena + +import ( + "net/url" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" +) + +func TestConfig_SetFromURL(t *testing.T) { + tests := []struct { + name string + url string + want Config + wantErr string + }{ + { + name: "params to emiter", + url: "athena://db.tbl/?topicArn=arn:topic&largeEventsS3=s3://large-events-bucket", + want: Config{ + TableName: "tbl", + Database: "db", + TopicARN: "arn:topic", + LargeEventsS3: "s3://large-events-bucket", + }, + }, + { + name: "params to querier - part 1", + url: "athena://db.tbl/?locationS3=s3://events-bucket&queryResultsS3=s3://results-bucket&workgroup=default", + want: Config{ + TableName: "tbl", + Database: "db", + LocationS3: "s3://events-bucket", + QueryResultsS3: "s3://results-bucket", + Workgroup: "default", + }, + }, + { + name: "params to querier - part 2", + url: "athena://db.tbl/?getQueryResultsSleepTime=200ms&limiterRate=0.642&limiterBurst=3", + want: Config{ + TableName: "tbl", + Database: "db", + GetQueryResultsSleepTime: 200 * time.Millisecond, + LimiterRate: 0.642, + LimiterBurst: 3, + }, + }, + { + name: "params to batcher", + url: "athena://db.tbl/?queueURL=https://queueURL&batchMaxItems=1000&batchMaxInterval=10s", + want: Config{ + TableName: "tbl", + Database: "db", + QueueUrl: "https://queueURL", + BatchMaxItems: 1000, + BatchMaxInterval: 10 * time.Second, + }, + }, + { + name: "invalid database/table format", + url: "athena://dsa.dsa.dsa", + wantErr: "invalid athena address, supported format is 'athena://database.table'", + }, + { + name: "invalid limiterRate format", + url: "athena://db.tbl/?limiterRate=abc", + wantErr: "invalid limiterRate value (it must be float32)", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &Config{} + u, err := url.Parse(tt.url) + require.NoError(t, err, "Failed to parse url") + err = cfg.SetFromURL(u) + if tt.wantErr == "" { + require.NoError(t, err, "SetFromURL return unexpected err") + require.Empty(t, cmp.Diff(tt.want, *cfg, cmpopts.EquateApprox(0, 0.0001))) + } else { + require.ErrorContains(t, err, tt.wantErr) + } + }) + } +} + +func TestConfig_CheckAndSetDefaults(t *testing.T) { + validConfig := Config{ + Database: "db", + TableName: "tbl", + TopicARN: "arn:topic", + LargeEventsS3: "s3://large-payloads-bucket", + LocationS3: "s3://events-bucket", + QueueUrl: "https://queue-url", + } + tests := []struct { + name string + input func() Config + want Config + wantErr string + }{ + { + name: "minimum config with defaults", + input: func() Config { + return validConfig + }, + want: Config{ + Database: "db", + TableName: "tbl", + TopicARN: "arn:topic", + LargeEventsS3: "s3://large-payloads-bucket", + LocationS3: "s3://events-bucket", + QueueUrl: "https://queue-url", + GetQueryResultsSleepTime: 100 * time.Millisecond, + BatchMaxItems: 20000, + BatchMaxInterval: 1 * time.Minute, + }, + }, + { + name: "missing table name", + input: func() Config { + cfg := validConfig + cfg.TableName = "" + return cfg + }, + wantErr: "TableName is not specified", + }, + { + name: "invalid table name", + input: func() Config { + cfg := validConfig + cfg.TableName = "table with space" + return cfg + }, + wantErr: "TableName can contains only alphanumeric or underscore character", + }, + { + name: "missing topicARN", + input: func() Config { + cfg := validConfig + cfg.TopicARN = "" + return cfg + }, + wantErr: "TopicARN is not specified", + }, + { + name: "missing LocationS3", + input: func() Config { + cfg := validConfig + cfg.LocationS3 = "" + return cfg + }, + wantErr: "LocationS3 is not specified", + }, + { + name: "invalid LocationS3", + input: func() Config { + cfg := validConfig + cfg.LocationS3 = "https://abc" + return cfg + }, + wantErr: "LocationS3 must be valid url and start with s3", + }, + { + name: "missing QueueUrl", + input: func() Config { + cfg := validConfig + cfg.QueueUrl = "" + return cfg + }, + wantErr: "QueueUrl is not specified", + }, + { + name: "invalid QueueUrl", + input: func() Config { + cfg := validConfig + cfg.QueueUrl = "s3://abc" + return cfg + }, + wantErr: "QueueUrl must be valid url and start with https", + }, + { + name: "invalid LimiterBurst and LimiterRate combination", + input: func() Config { + cfg := validConfig + cfg.LimiterBurst = 0 + cfg.LimiterRate = 2.5 + return cfg + }, + wantErr: "LimiterBurst must be greater than 0 if LimiterRate is used", + }, + { + name: "invalid LimiterRate and LimiterBurst combination", + input: func() Config { + cfg := validConfig + cfg.LimiterBurst = 3 + cfg.LimiterRate = 0 + return cfg + }, + wantErr: "LimiterRate must be greater than 0 if LimiterBurst is used", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := tt.input() + err := cfg.CheckAndSetDefaults() + if tt.wantErr == "" { + require.NoError(t, err, "CheckAndSetDefaults return unexpected err") + require.Empty(t, cmp.Diff(tt.want, cfg, cmpopts.EquateApprox(0, 0.0001), cmpopts.IgnoreFields(Config{}, "Clock", "UIDGenerator"))) + } else { + require.ErrorContains(t, err, tt.wantErr) + } + }) + } +} diff --git a/lib/service/service.go b/lib/service/service.go index 045caa77bb188..75a478f211e20 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -82,6 +82,7 @@ import ( "github.com/gravitational/teleport/lib/cloud" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/events" + "github.com/gravitational/teleport/lib/events/athena" "github.com/gravitational/teleport/lib/events/dynamoevents" "github.com/gravitational/teleport/lib/events/filesessions" "github.com/gravitational/teleport/lib/events/firestoreevents" @@ -1362,6 +1363,20 @@ func initAuthExternalAuditLog(ctx context.Context, auditConfig types.ClusterAudi return nil, trace.Wrap(err) } loggers = append(loggers, logger) + case teleport.ComponentAthena: + hasNonFileLog = true + cfg := athena.Config{ + Region: auditConfig.Region(), + } + err = cfg.SetFromURL(uri) + if err != nil { + return nil, trace.Wrap(err) + } + logger, err := athena.New(ctx, cfg) + if err != nil { + return nil, trace.Wrap(err) + } + loggers = append(loggers, logger) case teleport.SchemeFile: if uri.Path == "" { return nil, trace.BadParameter("unsupported audit uri: %q (missing path component)", uri) From e0130fe5c8bd0471d6c700b544f8dd6df3b74fad Mon Sep 17 00:00:00 2001 From: Tobiasz Heller Date: Wed, 29 Mar 2023 11:57:42 +0200 Subject: [PATCH 2/9] use sqs timeout as duration --- lib/events/athena/athena.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 91563a50dcc1f..9af2ccb3df34f 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -25,9 +25,9 @@ import ( const ( // TODO(tobiaszheller): move to batcher.go in other PR. - // maxWaitTimeOnReceiveMessageFromSQSInSeconds defines how long single + // maxWaitTimeOnReceiveMessageFromSQS defines how long single // receiveFromQueue will wait if there is no max events (10). - maxWaitTimeOnReceiveMessageFromSQSInSeconds = 5 + maxWaitTimeOnReceiveMessageFromSQS = 5 * time.Second ) // Config structure represents Athena configuration. @@ -150,7 +150,7 @@ func (cfg *Config) CheckAndSetDefaults() error { cfg.BatchMaxInterval = 1 * time.Minute } - if cfg.BatchMaxInterval < maxWaitTimeOnReceiveMessageFromSQSInSeconds*time.Second { + if cfg.BatchMaxInterval < maxWaitTimeOnReceiveMessageFromSQS { // If BatchMaxInterval is shorter it will mean we will cancel all // requests when there is less messages than 10 on queue. // This can be fixed by shortening timeout on read, but realisticly From 31399319d08513b2045b9f8b4b2928a14aecb9bd Mon Sep 17 00:00:00 2001 From: Tobiasz Heller Date: Wed, 29 Mar 2023 11:59:56 +0200 Subject: [PATCH 3/9] compile regexp once --- lib/events/athena/athena.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 9af2ccb3df34f..74b81b03956c2 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -337,10 +337,10 @@ func (l *Log) StreamSessionEvents(ctx context.Context, sessionID session.ID, sta return c, e } +var isAlphanumericOrUnderscoreRe = regexp.MustCompile("^[a-zA-Z0-9_]+$") + func isAlphanumericOrUnderscore(s string) bool { - pattern := "^[a-zA-Z0-9_]+$" - re := regexp.MustCompile(pattern) - return re.MatchString(s) + return isAlphanumericOrUnderscoreRe.MatchString(s) } func isValidUrlWithScheme(s string) (string, bool) { From 89399d062fe83c2163fccf5c7bce6f308f48c312 Mon Sep 17 00:00:00 2001 From: Tobiasz Heller Date: Wed, 29 Mar 2023 12:01:23 +0200 Subject: [PATCH 4/9] Rename to QueueURL --- lib/events/athena/athena.go | 14 +++++++------- lib/events/athena/athena_test.go | 18 +++++++++--------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 74b81b03956c2..41e0f6c3c5576 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -69,8 +69,8 @@ type Config struct { // Batcher settings. - // QueueUrl is URL of SQS, which is set as subscriber to SNS topic (required). - QueueUrl string + // QueueURL is URL of SQS, which is set as subscriber to SNS topic (required). + QueueURL string // BatchMaxItems defines how many items can be stored in single Parquet // batch (optional). // It's soft limit. @@ -130,11 +130,11 @@ func (cfg *Config) CheckAndSetDefaults() error { return trace.BadParameter("LargeEventsS3 must be valid url and start with s3") } - if cfg.QueueUrl == "" { - return trace.BadParameter("QueueUrl is not specified") + if cfg.QueueURL == "" { + return trace.BadParameter("QueueURL is not specified") } - if scheme, ok := isValidUrlWithScheme(cfg.QueueUrl); !ok || scheme != "https" { - return trace.BadParameter("QueueUrl must be valid url and start with https") + if scheme, ok := isValidUrlWithScheme(cfg.QueueURL); !ok || scheme != "https" { + return trace.BadParameter("QueueURL must be valid url and start with https") } if cfg.GetQueryResultsSleepTime == 0 { @@ -240,7 +240,7 @@ func (cfg *Config) SetFromURL(url *url.URL) error { queueURL := url.Query().Get("queueURL") if queueURL != "" { - cfg.QueueUrl = queueURL + cfg.QueueURL = queueURL } batchMaxItems := url.Query().Get("batchMaxItems") if batchMaxItems != "" { diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go index 2f0b44ae3d2f2..e3b23fec2de5f 100644 --- a/lib/events/athena/athena_test.go +++ b/lib/events/athena/athena_test.go @@ -55,7 +55,7 @@ func TestConfig_SetFromURL(t *testing.T) { want: Config{ TableName: "tbl", Database: "db", - QueueUrl: "https://queueURL", + QueueURL: "https://queueURL", BatchMaxItems: 1000, BatchMaxInterval: 10 * time.Second, }, @@ -94,7 +94,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { TopicARN: "arn:topic", LargeEventsS3: "s3://large-payloads-bucket", LocationS3: "s3://events-bucket", - QueueUrl: "https://queue-url", + QueueURL: "https://queue-url", } tests := []struct { name string @@ -113,7 +113,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { TopicARN: "arn:topic", LargeEventsS3: "s3://large-payloads-bucket", LocationS3: "s3://events-bucket", - QueueUrl: "https://queue-url", + QueueURL: "https://queue-url", GetQueryResultsSleepTime: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, @@ -165,22 +165,22 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { wantErr: "LocationS3 must be valid url and start with s3", }, { - name: "missing QueueUrl", + name: "missing QueueURL", input: func() Config { cfg := validConfig - cfg.QueueUrl = "" + cfg.QueueURL = "" return cfg }, - wantErr: "QueueUrl is not specified", + wantErr: "QueueURL is not specified", }, { - name: "invalid QueueUrl", + name: "invalid QueueURL", input: func() Config { cfg := validConfig - cfg.QueueUrl = "s3://abc" + cfg.QueueURL = "s3://abc" return cfg }, - wantErr: "QueueUrl must be valid url and start with https", + wantErr: "QueueURL must be valid url and start with https", }, { name: "invalid LimiterBurst and LimiterRate combination", From 7c3bb77cf274d62ad11cb7fbf629c443de84c54a Mon Sep 17 00:00:00 2001 From: Tobiasz Heller Date: Wed, 29 Mar 2023 12:05:44 +0200 Subject: [PATCH 5/9] add aws docs and comments --- lib/events/athena/athena.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 41e0f6c3c5576..05e5afa0b9320 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -87,8 +87,9 @@ type Config struct { } // CheckAndSetDefaults is a helper returns an error if the supplied configuration -// is not enough to connect to SNS +// is not enough to setup Athena based audit log. func (cfg *Config) CheckAndSetDefaults() error { + // AWS restrictions (https://docs.aws.amazon.com/athena/latest/ug/tables-databases-columns-names.html) const glueNameMaxLen = 255 if cfg.Database == "" { return trace.BadParameter("Database is not specified") @@ -273,7 +274,7 @@ type Log struct { session *awssession.Session } -// New returns new instance of athena based audit logger. +// New creates an instance of an Athena based audit log. func New(ctx context.Context, cfg Config) (*Log, error) { err := cfg.CheckAndSetDefaults() if err != nil { From f724369a105ee3e8c8350933f824a772eae8a6b8 Mon Sep 17 00:00:00 2001 From: Tobiasz Heller Date: Wed, 29 Mar 2023 12:08:54 +0200 Subject: [PATCH 6/9] update log decsription --- lib/events/athena/athena.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 05e5afa0b9320..cb22eeb4cf0d4 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -263,7 +263,13 @@ func (cfg *Config) SetFromURL(url *url.URL) error { return nil } -// Log is a aws storage of events. +// Log is an events storage backend. +// +// It's using SNS for emitting events. +// SQS is used as subscriber for SNS topic. +// Consumer uses SQS to read multiple events, create batch, convert it to +// Parquet and send it to S3 for long term storage. +// Athena is used for quering Parquet files on S3. type Log struct { // Entry is a log entry *log.Entry From 3d87fad13dad87b179494136872e0d170fdc6b02 Mon Sep 17 00:00:00 2001 From: Tobiasz Heller Date: Wed, 29 Mar 2023 12:44:02 +0200 Subject: [PATCH 7/9] add license --- lib/events/athena/athena.go | 14 ++++++++++++++ lib/events/athena/athena_test.go | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index cb22eeb4cf0d4..0d9ad8d17bc37 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -1,3 +1,17 @@ +// Copyright 2023 Gravitational, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package athena import ( diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go index e3b23fec2de5f..72dc29079bef4 100644 --- a/lib/events/athena/athena_test.go +++ b/lib/events/athena/athena_test.go @@ -1,3 +1,17 @@ +// Copyright 2023 Gravitational, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package athena import ( From 9fee1e1056ef8912213cbda29d051d9757548882 Mon Sep 17 00:00:00 2001 From: Tobiasz Heller Date: Fri, 31 Mar 2023 09:55:07 +0200 Subject: [PATCH 8/9] rename getQueryResultsInterval --- lib/events/athena/athena.go | 18 +++++++++--------- lib/events/athena/athena_test.go | 30 +++++++++++++++--------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 0d9ad8d17bc37..4f24680200a30 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -73,9 +73,9 @@ type Config struct { QueryResultsS3 string // Workgroup is Glue workgroup where Athena queries are executed (optional). Workgroup string - // GetQueryResultsSleepTime is used to define how long query will wait before + // GetQueryResultsInterval is used to define how long query will wait before // checking again for results status if previous status was not ready (optional). - GetQueryResultsSleepTime time.Duration + GetQueryResultsInterval time.Duration // LimiterRate defines rate at which search_event rate limiter is filled (optional). LimiterRate float64 // LimiterBurst defines rate limit bucket capacity (optional). @@ -152,8 +152,8 @@ func (cfg *Config) CheckAndSetDefaults() error { return trace.BadParameter("QueueURL must be valid url and start with https") } - if cfg.GetQueryResultsSleepTime == 0 { - cfg.GetQueryResultsSleepTime = 100 * time.Millisecond + if cfg.GetQueryResultsInterval == 0 { + cfg.GetQueryResultsInterval = 100 * time.Millisecond } if cfg.BatchMaxItems == 0 { @@ -228,13 +228,13 @@ func (cfg *Config) SetFromURL(url *url.URL) error { if workgroup != "" { cfg.Workgroup = workgroup } - getQueryResultsSleepTime := url.Query().Get("getQueryResultsSleepTime") - if getQueryResultsSleepTime != "" { - dur, err := time.ParseDuration(getQueryResultsSleepTime) + getQueryResultsInterval := url.Query().Get("getQueryResultsInterval") + if getQueryResultsInterval != "" { + dur, err := time.ParseDuration(getQueryResultsInterval) if err != nil { - return trace.BadParameter("invalid getQueryResultsSleepTime value: %v", err) + return trace.BadParameter("invalid getQueryResultsInterval value: %v", err) } - cfg.GetQueryResultsSleepTime = dur + cfg.GetQueryResultsInterval = dur } rateInString := url.Query().Get("limiterRate") if rateInString != "" { diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go index 72dc29079bef4..60d084c2df5a0 100644 --- a/lib/events/athena/athena_test.go +++ b/lib/events/athena/athena_test.go @@ -54,13 +54,13 @@ func TestConfig_SetFromURL(t *testing.T) { }, { name: "params to querier - part 2", - url: "athena://db.tbl/?getQueryResultsSleepTime=200ms&limiterRate=0.642&limiterBurst=3", + url: "athena://db.tbl/?getQueryResultsInterval=200ms&limiterRate=0.642&limiterBurst=3", want: Config{ - TableName: "tbl", - Database: "db", - GetQueryResultsSleepTime: 200 * time.Millisecond, - LimiterRate: 0.642, - LimiterBurst: 3, + TableName: "tbl", + Database: "db", + GetQueryResultsInterval: 200 * time.Millisecond, + LimiterRate: 0.642, + LimiterBurst: 3, }, }, { @@ -122,15 +122,15 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { return validConfig }, want: Config{ - Database: "db", - TableName: "tbl", - TopicARN: "arn:topic", - LargeEventsS3: "s3://large-payloads-bucket", - LocationS3: "s3://events-bucket", - QueueURL: "https://queue-url", - GetQueryResultsSleepTime: 100 * time.Millisecond, - BatchMaxItems: 20000, - BatchMaxInterval: 1 * time.Minute, + Database: "db", + TableName: "tbl", + TopicARN: "arn:topic", + LargeEventsS3: "s3://large-payloads-bucket", + LocationS3: "s3://events-bucket", + QueueURL: "https://queue-url", + GetQueryResultsInterval: 100 * time.Millisecond, + BatchMaxItems: 20000, + BatchMaxInterval: 1 * time.Minute, }, }, { From 7a0fdf8900c4a51db38ad372260c913f00dcafe8 Mon Sep 17 00:00:00 2001 From: Tobiasz Heller Date: Fri, 31 Mar 2023 10:52:35 +0200 Subject: [PATCH 9/9] use aws sdk v2 --- lib/events/athena/athena.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 4f24680200a30..39b5cfaf42748 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -23,8 +23,8 @@ import ( "strings" "time" - "github.com/aws/aws-sdk-go/aws" - awssession "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" log "github.com/sirupsen/logrus" @@ -290,8 +290,7 @@ type Log struct { // Config is a backend configuration Config - // session holds the AWS client. - session *awssession.Session + awsConfig aws.Config } // New creates an instance of an Athena based audit log. @@ -307,18 +306,15 @@ func New(ctx context.Context, cfg Config) (*Log, error) { Entry: logEntry, Config: cfg, } - // Create an AWS session using default SDK behavior, i.e. it will interpret - // the environment and ~/.aws directory just like an AWS CLI tool would. - l.session, err = awssession.NewSessionWithOptions(awssession.Options{ - SharedConfigState: awssession.SharedConfigEnable, - }) + + l.awsConfig, err = awsconfig.LoadDefaultConfig(ctx) if err != nil { return nil, trace.Wrap(err) } // override the default environment (region + credentials) with the values // from the config. if cfg.Region != "" { - l.session.Config.Region = aws.String(cfg.Region) + l.awsConfig.Region = cfg.Region } // TODO(tobiaszheller): initialize publisher