From 070d0a1d944a4c0f4d3f5a1ee8df46211713c65f Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Thu, 19 Sep 2024 18:15:06 +0200 Subject: [PATCH 1/8] Add support for custom SQS consumer lock and disabling consumer --- lib/events/athena/athena.go | 26 +++++++++++++++++++++++++- lib/events/athena/athena_test.go | 6 +++++- lib/events/athena/consumer.go | 4 +++- lib/events/athena/consumer_test.go | 3 +++ 4 files changed, 36 insertions(+), 3 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 4e25219aca4f7..bf99234684886 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -20,6 +20,7 @@ package athena import ( "context" + "fmt" "io" "net/url" "regexp" @@ -125,6 +126,12 @@ type Config struct { BatchMaxItems int // BatchMaxInterval defined interval at which parquet files will be created (optional). BatchMaxInterval time.Duration + // ConsumerLockName defines a name of a SQS consumer lock (optional). + // If provided, it will be prefixed with "athena_" to avoid accidental + // collision with existing locks. + ConsumerLockName string + // ConsumerDisabled defines if SQS consumer should be disabled (optional). + ConsumerDisabled bool // Clock is a clock interface, used in tests. Clock clockwork.Clock @@ -250,6 +257,10 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error { return trace.BadParameter("BatchMaxInterval too short, must be greater than 5s") } + if cfg.ConsumerLockName == "" { + cfg.ConsumerLockName = "athena_lock" + } + if cfg.LimiterRefillAmount < 0 { return trace.BadParameter("LimiterRefillAmount cannot be nagative") } @@ -417,6 +428,17 @@ func (cfg *Config) SetFromURL(url *url.URL) error { } cfg.BatchMaxInterval = dur } + consumerLockName := url.Query().Get("consumerLockName") + if consumerLockName != "" { + cfg.ConsumerLockName = fmt.Sprintf("athena_%s", consumerLockName) + } + if val := url.Query().Get("consumerDisabled"); val != "" { + boolVal, err := strconv.ParseBool(val) + if err != nil { + return trace.BadParameter("invalid consumerDisabled value: %v", err) + } + cfg.ConsumerDisabled = boolVal + } return nil } @@ -497,7 +519,9 @@ func New(ctx context.Context, cfg Config) (*Log, error) { consumerCloser: consumer, } - go consumer.run(consumerCtx) + if !cfg.ConsumerDisabled { + go consumer.run(consumerCtx) + } return l, nil } diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go index e0e52d8903b73..14ac86856997d 100644 --- a/lib/events/athena/athena_test.go +++ b/lib/events/athena/athena_test.go @@ -93,13 +93,15 @@ func TestConfig_SetFromURL(t *testing.T) { }, { name: "params to batcher", - url: "athena://db.tbl/?queueURL=https://queueURL&batchMaxItems=1000&batchMaxInterval=10s", + url: "athena://db.tbl/?queueURL=https://queueURL&batchMaxItems=1000&batchMaxInterval=10s&consumerLockName=mylock&consumerDisabled=true", want: Config{ TableName: "tbl", Database: "db", QueueURL: "https://queueURL", BatchMaxItems: 1000, BatchMaxInterval: 10 * time.Second, + ConsumerLockName: "athena_mylock", + ConsumerDisabled: true, }, }, { @@ -187,6 +189,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, + ConsumerLockName: "athena_lock", PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, @@ -212,6 +215,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, + ConsumerLockName: "athena_lock", PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index 809c20f676d27..2f4dcb6a264dc 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -79,6 +79,7 @@ type consumer struct { storeLocationBucket string batchMaxItems int batchMaxInterval time.Duration + consumerLockName string // perDateFileParquetWriter returns file writer per date. // Added in config to allow testing. @@ -157,6 +158,7 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) { storeLocationBucket: cfg.locationS3Bucket, batchMaxItems: cfg.BatchMaxItems, batchMaxInterval: cfg.BatchMaxInterval, + consumerLockName: cfg.ConsumerLockName, collectConfig: collectCfg, sqsDeleter: sqsClient, queueURL: cfg.QueueURL, @@ -258,7 +260,7 @@ func (c *consumer) runContinuouslyOnSingleAuth(ctx context.Context, eventsProces err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{ LockConfiguration: backend.LockConfiguration{ Backend: c.backend, - LockNameComponents: []string{"athena_lock"}, + LockNameComponents: []string{c.consumerLockName}, // TTL is higher then batchMaxInterval because we want to optimize // for low backend writes. TTL: 5 * c.batchMaxInterval, diff --git a/lib/events/athena/consumer_test.go b/lib/events/athena/consumer_test.go index 1aa12c684fcc7..4bed63b42df42 100644 --- a/lib/events/athena/consumer_test.go +++ b/lib/events/athena/consumer_test.go @@ -422,16 +422,19 @@ func TestConsumerRunContinuouslyOnSingleAuth(t *testing.T) { defer backend.Close() batchInterval := 20 * time.Millisecond + lockName := "athena_lock" c1 := consumer{ logger: log, backend: backend, batchMaxInterval: batchInterval, + consumerLockName: lockName, } c2 := consumer{ logger: log, backend: backend, batchMaxInterval: batchInterval, + consumerLockName: lockName, } m1 := mockEventsProcessor{interval: batchInterval} m2 := mockEventsProcessor{interval: batchInterval} From 39348276ea910afd14a5d4657579428b924c6905 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Fri, 20 Sep 2024 10:24:00 +0200 Subject: [PATCH 2/8] Update lib/events/athena/athena.go Co-authored-by: Edoardo Spadolini --- lib/events/athena/athena.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index bf99234684886..0254f4b182b4f 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -428,8 +428,7 @@ func (cfg *Config) SetFromURL(url *url.URL) error { } cfg.BatchMaxInterval = dur } - consumerLockName := url.Query().Get("consumerLockName") - if consumerLockName != "" { + if consumerLockName := url.Query().Get("consumerLockName"); consumerLockName != "" { cfg.ConsumerLockName = fmt.Sprintf("athena_%s", consumerLockName) } if val := url.Query().Get("consumerDisabled"); val != "" { From 5b9d359d70918e6c56e6ded1456777481b87da30 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Fri, 20 Sep 2024 11:16:37 +0200 Subject: [PATCH 3/8] Use LockNameComponents for constructing a lock name --- lib/events/athena/athena.go | 9 ++++----- lib/events/athena/athena_test.go | 6 +++--- lib/events/athena/consumer.go | 4 ++-- lib/events/athena/consumer_test.go | 2 +- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 0254f4b182b4f..f9af99af0fbbc 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -20,7 +20,6 @@ package athena import ( "context" - "fmt" "io" "net/url" "regexp" @@ -129,7 +128,7 @@ type Config struct { // ConsumerLockName defines a name of a SQS consumer lock (optional). // If provided, it will be prefixed with "athena_" to avoid accidental // collision with existing locks. - ConsumerLockName string + ConsumerLockName []string // ConsumerDisabled defines if SQS consumer should be disabled (optional). ConsumerDisabled bool @@ -257,8 +256,8 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error { return trace.BadParameter("BatchMaxInterval too short, must be greater than 5s") } - if cfg.ConsumerLockName == "" { - cfg.ConsumerLockName = "athena_lock" + if len(cfg.ConsumerLockName) == 0 { + cfg.ConsumerLockName = []string{"athena_lock"} } if cfg.LimiterRefillAmount < 0 { @@ -429,7 +428,7 @@ func (cfg *Config) SetFromURL(url *url.URL) error { cfg.BatchMaxInterval = dur } if consumerLockName := url.Query().Get("consumerLockName"); consumerLockName != "" { - cfg.ConsumerLockName = fmt.Sprintf("athena_%s", consumerLockName) + cfg.ConsumerLockName = []string{"athena", consumerLockName} } if val := url.Query().Get("consumerDisabled"); val != "" { boolVal, err := strconv.ParseBool(val) diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go index 14ac86856997d..71107a13967ec 100644 --- a/lib/events/athena/athena_test.go +++ b/lib/events/athena/athena_test.go @@ -100,7 +100,7 @@ func TestConfig_SetFromURL(t *testing.T) { QueueURL: "https://queueURL", BatchMaxItems: 1000, BatchMaxInterval: 10 * time.Second, - ConsumerLockName: "athena_mylock", + ConsumerLockName: []string{"athena", "mylock"}, ConsumerDisabled: true, }, }, @@ -189,7 +189,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, - ConsumerLockName: "athena_lock", + ConsumerLockName: []string{"athena_lock"}, PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, @@ -215,7 +215,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, - ConsumerLockName: "athena_lock", + ConsumerLockName: []string{"athena_lock"}, PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index 2f4dcb6a264dc..325ea2c909ae4 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -79,7 +79,7 @@ type consumer struct { storeLocationBucket string batchMaxItems int batchMaxInterval time.Duration - consumerLockName string + consumerLockName []string // perDateFileParquetWriter returns file writer per date. // Added in config to allow testing. @@ -260,7 +260,7 @@ func (c *consumer) runContinuouslyOnSingleAuth(ctx context.Context, eventsProces err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{ LockConfiguration: backend.LockConfiguration{ Backend: c.backend, - LockNameComponents: []string{c.consumerLockName}, + LockNameComponents: c.consumerLockName, // TTL is higher then batchMaxInterval because we want to optimize // for low backend writes. TTL: 5 * c.batchMaxInterval, diff --git a/lib/events/athena/consumer_test.go b/lib/events/athena/consumer_test.go index 4bed63b42df42..d6f12afd3b1ac 100644 --- a/lib/events/athena/consumer_test.go +++ b/lib/events/athena/consumer_test.go @@ -422,7 +422,7 @@ func TestConsumerRunContinuouslyOnSingleAuth(t *testing.T) { defer backend.Close() batchInterval := 20 * time.Millisecond - lockName := "athena_lock" + lockName := []string{"athena_lock"} c1 := consumer{ logger: log, From 1ab621d72d33668f43509195c7c809a9e7e8f3f7 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Fri, 20 Sep 2024 21:39:15 +0200 Subject: [PATCH 4/8] Add a test for disabled consumer --- lib/events/athena/athena.go | 22 ++++++++++++++-------- lib/service/service_test.go | 10 ++++++++++ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index f9af99af0fbbc..9100ad55e7350 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -504,20 +504,22 @@ func New(ctx context.Context, cfg Config) (*Log, error) { return nil, trace.Wrap(err) } - consumerCtx, consumerCancel := context.WithCancel(ctx) - - consumer, err := newConsumer(cfg, consumerCancel) - if err != nil { - return nil, trace.Wrap(err) - } - l := &Log{ publisher: newPublisherFromAthenaConfig(cfg), querier: querier, - consumerCloser: consumer, + consumerCloser: nil, } if !cfg.ConsumerDisabled { + consumerCtx, consumerCancel := context.WithCancel(ctx) + + consumer, err := newConsumer(cfg, consumerCancel) + if err != nil { + return nil, trace.Wrap(err) + } + + l.consumerCloser = consumer + go consumer.run(consumerCtx) } @@ -549,6 +551,10 @@ func (l *Log) Close() error { return trace.Wrap(l.consumerCloser.Close()) } +func (l *Log) IsConsumerDisabled() bool { + return l.consumerCloser == nil +} + var isAlphanumericOrUnderscoreRe = regexp.MustCompile("^[a-zA-Z0-9_]+$") func isAlphanumericOrUnderscore(s string) bool { diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 6e3ab2554977b..7240ddcf65947 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -539,6 +539,16 @@ func TestAthenaAuditLogSetup(t *testing.T) { require.True(t, ok, "invalid logger type, got %T", v) }, }, + { + name: "valid athena config with disabled consumer", + uris: []string{sampleAthenaURI + "&consumerDisabled=false"}, + externalAudit: externalAuditStorageDisabled, + wantFn: func(t *testing.T, alog events.AuditLogger) { + v, ok := alog.(*athena.Log) + require.True(t, ok, "invalid logger type, got %T", v) + require.True(t, v.IsConsumerDisabled(), "consumer is not disabled") + }, + }, { name: "config with rate limit - should use events.SearchEventsLimiter", uris: []string{sampleAthenaURI + "&limiterRefillAmount=3&limiterBurst=2"}, From 0b75ffeb6ed45232701445f29dd44f6bc7c27761 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Fri, 20 Sep 2024 21:40:34 +0200 Subject: [PATCH 5/8] Fix URI in disabled consumer test --- lib/service/service_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 7240ddcf65947..70c8ebe9a7876 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -541,7 +541,7 @@ func TestAthenaAuditLogSetup(t *testing.T) { }, { name: "valid athena config with disabled consumer", - uris: []string{sampleAthenaURI + "&consumerDisabled=false"}, + uris: []string{sampleAthenaURI + "&consumerDisabled=true"}, externalAudit: externalAuditStorageDisabled, wantFn: func(t *testing.T, alog events.AuditLogger) { v, ok := alog.(*athena.Log) From 51e9f9af786e6c3c046a297173b5c4157fa49c7c Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Tue, 1 Oct 2024 15:07:30 +0200 Subject: [PATCH 6/8] Address feedback about ConsumerLockName being a single string --- lib/events/athena/athena.go | 10 +++------- lib/events/athena/athena_test.go | 6 +++--- lib/events/athena/consumer.go | 8 ++++++-- lib/events/athena/consumer_test.go | 3 --- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 9100ad55e7350..0d7c3af0fe9f1 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -126,9 +126,9 @@ type Config struct { // BatchMaxInterval defined interval at which parquet files will be created (optional). BatchMaxInterval time.Duration // ConsumerLockName defines a name of a SQS consumer lock (optional). - // If provided, it will be prefixed with "athena_" to avoid accidental + // If provided, it will be prefixed with "athena/" to avoid accidental // collision with existing locks. - ConsumerLockName []string + ConsumerLockName string // ConsumerDisabled defines if SQS consumer should be disabled (optional). ConsumerDisabled bool @@ -256,10 +256,6 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error { return trace.BadParameter("BatchMaxInterval too short, must be greater than 5s") } - if len(cfg.ConsumerLockName) == 0 { - cfg.ConsumerLockName = []string{"athena_lock"} - } - if cfg.LimiterRefillAmount < 0 { return trace.BadParameter("LimiterRefillAmount cannot be nagative") } @@ -428,7 +424,7 @@ func (cfg *Config) SetFromURL(url *url.URL) error { cfg.BatchMaxInterval = dur } if consumerLockName := url.Query().Get("consumerLockName"); consumerLockName != "" { - cfg.ConsumerLockName = []string{"athena", consumerLockName} + cfg.ConsumerLockName = consumerLockName } if val := url.Query().Get("consumerDisabled"); val != "" { boolVal, err := strconv.ParseBool(val) diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go index 71107a13967ec..541a9d3b92980 100644 --- a/lib/events/athena/athena_test.go +++ b/lib/events/athena/athena_test.go @@ -100,7 +100,7 @@ func TestConfig_SetFromURL(t *testing.T) { QueueURL: "https://queueURL", BatchMaxItems: 1000, BatchMaxInterval: 10 * time.Second, - ConsumerLockName: []string{"athena", "mylock"}, + ConsumerLockName: "mylock", ConsumerDisabled: true, }, }, @@ -189,7 +189,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, - ConsumerLockName: []string{"athena_lock"}, + ConsumerLockName: "", PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, @@ -215,7 +215,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, - ConsumerLockName: []string{"athena_lock"}, + ConsumerLockName: "", PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index 325ea2c909ae4..38fb1e6aee52c 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -79,7 +79,7 @@ type consumer struct { storeLocationBucket string batchMaxItems int batchMaxInterval time.Duration - consumerLockName []string + consumerLockName string // perDateFileParquetWriter returns file writer per date. // Added in config to allow testing. @@ -257,10 +257,14 @@ func (c *consumer) runContinuouslyOnSingleAuth(ctx context.Context, eventsProces case <-ctx.Done(): return default: + lockName := []string{"athena", c.consumerLockName} + if c.consumerLockName == "" { + lockName = []string{"athena_lock"} + } err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{ LockConfiguration: backend.LockConfiguration{ Backend: c.backend, - LockNameComponents: c.consumerLockName, + LockNameComponents: lockName, // TTL is higher then batchMaxInterval because we want to optimize // for low backend writes. TTL: 5 * c.batchMaxInterval, diff --git a/lib/events/athena/consumer_test.go b/lib/events/athena/consumer_test.go index d6f12afd3b1ac..1aa12c684fcc7 100644 --- a/lib/events/athena/consumer_test.go +++ b/lib/events/athena/consumer_test.go @@ -422,19 +422,16 @@ func TestConsumerRunContinuouslyOnSingleAuth(t *testing.T) { defer backend.Close() batchInterval := 20 * time.Millisecond - lockName := []string{"athena_lock"} c1 := consumer{ logger: log, backend: backend, batchMaxInterval: batchInterval, - consumerLockName: lockName, } c2 := consumer{ logger: log, backend: backend, batchMaxInterval: batchInterval, - consumerLockName: lockName, } m1 := mockEventsProcessor{interval: batchInterval} m2 := mockEventsProcessor{interval: batchInterval} From cc58a2d8f486b6b1604abd3c3f388076d8b26d6c Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Tue, 1 Oct 2024 15:32:52 +0200 Subject: [PATCH 7/8] Update lib/events/athena/athena.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/events/athena/athena.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 0d7c3af0fe9f1..d1c2ae49f6dcc 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -503,7 +503,6 @@ func New(ctx context.Context, cfg Config) (*Log, error) { l := &Log{ publisher: newPublisherFromAthenaConfig(cfg), querier: querier, - consumerCloser: nil, } if !cfg.ConsumerDisabled { From 15f1f779863a104825b9e56566662499b460fc31 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Tue, 1 Oct 2024 15:56:27 +0200 Subject: [PATCH 8/8] Make linter happy --- lib/events/athena/athena.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index d1c2ae49f6dcc..c72c92daf91cc 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -501,8 +501,8 @@ func New(ctx context.Context, cfg Config) (*Log, error) { } l := &Log{ - publisher: newPublisherFromAthenaConfig(cfg), - querier: querier, + publisher: newPublisherFromAthenaConfig(cfg), + querier: querier, } if !cfg.ConsumerDisabled {