From dc48191d84d697b3d4ed06c538669d31cd5d615c Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Thu, 19 Sep 2024 18:15:06 +0200 Subject: [PATCH 1/8] Add support for custom SQS consumer lock and disabling consumer --- lib/events/athena/athena.go | 26 +++++++++++++++++++++++++- lib/events/athena/athena_test.go | 6 +++++- lib/events/athena/consumer.go | 4 +++- lib/events/athena/consumer_test.go | 3 +++ 4 files changed, 36 insertions(+), 3 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 493cc09d9063e..8f03cc80698a5 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -16,6 +16,7 @@ package athena import ( "context" + "fmt" "io" "net/url" "regexp" @@ -122,6 +123,12 @@ type Config struct { BatchMaxItems int // BatchMaxInterval defined interval at which parquet files will be created (optional). BatchMaxInterval time.Duration + // ConsumerLockName defines a name of a SQS consumer lock (optional). + // If provided, it will be prefixed with "athena_" to avoid accidental + // collision with existing locks. + ConsumerLockName string + // ConsumerDisabled defines if SQS consumer should be disabled (optional). + ConsumerDisabled bool // Clock is a clock interface, used in tests. Clock clockwork.Clock @@ -247,6 +254,10 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error { return trace.BadParameter("BatchMaxInterval too short, must be greater than 5s") } + if cfg.ConsumerLockName == "" { + cfg.ConsumerLockName = "athena_lock" + } + if cfg.LimiterRefillAmount < 0 { return trace.BadParameter("LimiterRefillAmount cannot be nagative") } @@ -414,6 +425,17 @@ func (cfg *Config) SetFromURL(url *url.URL) error { } cfg.BatchMaxInterval = dur } + consumerLockName := url.Query().Get("consumerLockName") + if consumerLockName != "" { + cfg.ConsumerLockName = fmt.Sprintf("athena_%s", consumerLockName) + } + if val := url.Query().Get("consumerDisabled"); val != "" { + boolVal, err := strconv.ParseBool(val) + if err != nil { + return trace.BadParameter("invalid consumerDisabled value: %v", err) + } + cfg.ConsumerDisabled = boolVal + } return nil } @@ -494,7 +516,9 @@ func New(ctx context.Context, cfg Config) (*Log, error) { consumerCloser: consumer, } - go consumer.run(consumerCtx) + if !cfg.ConsumerDisabled { + go consumer.run(consumerCtx) + } return l, nil } diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go index fb908c5aeb3de..9e6169f7d9cbd 100644 --- a/lib/events/athena/athena_test.go +++ b/lib/events/athena/athena_test.go @@ -89,13 +89,15 @@ func TestConfig_SetFromURL(t *testing.T) { }, { name: "params to batcher", - url: "athena://db.tbl/?queueURL=https://queueURL&batchMaxItems=1000&batchMaxInterval=10s", + url: "athena://db.tbl/?queueURL=https://queueURL&batchMaxItems=1000&batchMaxInterval=10s&consumerLockName=mylock&consumerDisabled=true", want: Config{ TableName: "tbl", Database: "db", QueueURL: "https://queueURL", BatchMaxItems: 1000, BatchMaxInterval: 10 * time.Second, + ConsumerLockName: "athena_mylock", + ConsumerDisabled: true, }, }, { @@ -183,6 +185,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, + ConsumerLockName: "athena_lock", PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, @@ -208,6 +211,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, + ConsumerLockName: "athena_lock", PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index df1a27f909a73..b8801ec931d55 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -75,6 +75,7 @@ type consumer struct { storeLocationBucket string batchMaxItems int batchMaxInterval time.Duration + consumerLockName string // perDateFileParquetWriter returns file writer per date. // Added in config to allow testing. @@ -153,6 +154,7 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) { storeLocationBucket: cfg.locationS3Bucket, batchMaxItems: cfg.BatchMaxItems, batchMaxInterval: cfg.BatchMaxInterval, + consumerLockName: cfg.ConsumerLockName, collectConfig: collectCfg, sqsDeleter: sqsClient, queueURL: cfg.QueueURL, @@ -254,7 +256,7 @@ func (c *consumer) runContinuouslyOnSingleAuth(ctx context.Context, eventsProces err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{ LockConfiguration: backend.LockConfiguration{ Backend: c.backend, - LockNameComponents: []string{"athena_lock"}, + LockNameComponents: []string{c.consumerLockName}, // TTL is higher then batchMaxInterval because we want to optimize // for low backend writes. TTL: 5 * c.batchMaxInterval, diff --git a/lib/events/athena/consumer_test.go b/lib/events/athena/consumer_test.go index 16831aac6ff7e..254bf8e27c0cb 100644 --- a/lib/events/athena/consumer_test.go +++ b/lib/events/athena/consumer_test.go @@ -417,16 +417,19 @@ func TestConsumerRunContinuouslyOnSingleAuth(t *testing.T) { defer backend.Close() batchInterval := 20 * time.Millisecond + lockName := "athena_lock" c1 := consumer{ logger: log, backend: backend, batchMaxInterval: batchInterval, + consumerLockName: lockName, } c2 := consumer{ logger: log, backend: backend, batchMaxInterval: batchInterval, + consumerLockName: lockName, } m1 := mockEventsProcessor{interval: batchInterval} m2 := mockEventsProcessor{interval: batchInterval} From 2d41032026d3b228e4aed1ccb497a7c5b18f7ec5 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Fri, 20 Sep 2024 10:24:00 +0200 Subject: [PATCH 2/8] Update lib/events/athena/athena.go Co-authored-by: Edoardo Spadolini --- lib/events/athena/athena.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 8f03cc80698a5..da41614878ada 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -425,8 +425,7 @@ func (cfg *Config) SetFromURL(url *url.URL) error { } cfg.BatchMaxInterval = dur } - consumerLockName := url.Query().Get("consumerLockName") - if consumerLockName != "" { + if consumerLockName := url.Query().Get("consumerLockName"); consumerLockName != "" { cfg.ConsumerLockName = fmt.Sprintf("athena_%s", consumerLockName) } if val := url.Query().Get("consumerDisabled"); val != "" { From 85e6eaddfadbb91a34f9d9862f535ccc815e3a18 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Fri, 20 Sep 2024 11:16:37 +0200 Subject: [PATCH 3/8] Use LockNameComponents for constructing a lock name --- lib/events/athena/athena.go | 9 ++++----- lib/events/athena/athena_test.go | 6 +++--- lib/events/athena/consumer.go | 4 ++-- lib/events/athena/consumer_test.go | 2 +- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index da41614878ada..dda1d9e541134 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -16,7 +16,6 @@ package athena import ( "context" - "fmt" "io" "net/url" "regexp" @@ -126,7 +125,7 @@ type Config struct { // ConsumerLockName defines a name of a SQS consumer lock (optional). // If provided, it will be prefixed with "athena_" to avoid accidental // collision with existing locks. - ConsumerLockName string + ConsumerLockName []string // ConsumerDisabled defines if SQS consumer should be disabled (optional). ConsumerDisabled bool @@ -254,8 +253,8 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error { return trace.BadParameter("BatchMaxInterval too short, must be greater than 5s") } - if cfg.ConsumerLockName == "" { - cfg.ConsumerLockName = "athena_lock" + if len(cfg.ConsumerLockName) == 0 { + cfg.ConsumerLockName = []string{"athena_lock"} } if cfg.LimiterRefillAmount < 0 { @@ -426,7 +425,7 @@ func (cfg *Config) SetFromURL(url *url.URL) error { cfg.BatchMaxInterval = dur } if consumerLockName := url.Query().Get("consumerLockName"); consumerLockName != "" { - cfg.ConsumerLockName = fmt.Sprintf("athena_%s", consumerLockName) + cfg.ConsumerLockName = []string{"athena", consumerLockName} } if val := url.Query().Get("consumerDisabled"); val != "" { boolVal, err := strconv.ParseBool(val) diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go index 9e6169f7d9cbd..c07f4a0769dcc 100644 --- a/lib/events/athena/athena_test.go +++ b/lib/events/athena/athena_test.go @@ -96,7 +96,7 @@ func TestConfig_SetFromURL(t *testing.T) { QueueURL: "https://queueURL", BatchMaxItems: 1000, BatchMaxInterval: 10 * time.Second, - ConsumerLockName: "athena_mylock", + ConsumerLockName: []string{"athena", "mylock"}, ConsumerDisabled: true, }, }, @@ -185,7 +185,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, - ConsumerLockName: "athena_lock", + ConsumerLockName: []string{"athena_lock"}, PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, @@ -211,7 +211,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, - ConsumerLockName: "athena_lock", + ConsumerLockName: []string{"athena_lock"}, PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index b8801ec931d55..b5228608dfd4b 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -75,7 +75,7 @@ type consumer struct { storeLocationBucket string batchMaxItems int batchMaxInterval time.Duration - consumerLockName string + consumerLockName []string // perDateFileParquetWriter returns file writer per date. // Added in config to allow testing. @@ -256,7 +256,7 @@ func (c *consumer) runContinuouslyOnSingleAuth(ctx context.Context, eventsProces err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{ LockConfiguration: backend.LockConfiguration{ Backend: c.backend, - LockNameComponents: []string{c.consumerLockName}, + LockNameComponents: c.consumerLockName, // TTL is higher then batchMaxInterval because we want to optimize // for low backend writes. TTL: 5 * c.batchMaxInterval, diff --git a/lib/events/athena/consumer_test.go b/lib/events/athena/consumer_test.go index 254bf8e27c0cb..25f57b8ddbc26 100644 --- a/lib/events/athena/consumer_test.go +++ b/lib/events/athena/consumer_test.go @@ -417,7 +417,7 @@ func TestConsumerRunContinuouslyOnSingleAuth(t *testing.T) { defer backend.Close() batchInterval := 20 * time.Millisecond - lockName := "athena_lock" + lockName := []string{"athena_lock"} c1 := consumer{ logger: log, From 13ecfe723e2c73f353199583bf12cdf07196d216 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Fri, 20 Sep 2024 21:39:15 +0200 Subject: [PATCH 4/8] Add a test for disabled consumer --- lib/events/athena/athena.go | 22 ++++++++++++++-------- lib/service/service_test.go | 10 ++++++++++ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index dda1d9e541134..1edead1f52740 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -501,20 +501,22 @@ func New(ctx context.Context, cfg Config) (*Log, error) { return nil, trace.Wrap(err) } - consumerCtx, consumerCancel := context.WithCancel(ctx) - - consumer, err := newConsumer(cfg, consumerCancel) - if err != nil { - return nil, trace.Wrap(err) - } - l := &Log{ publisher: newPublisherFromAthenaConfig(cfg), querier: querier, - consumerCloser: consumer, + consumerCloser: nil, } if !cfg.ConsumerDisabled { + consumerCtx, consumerCancel := context.WithCancel(ctx) + + consumer, err := newConsumer(cfg, consumerCancel) + if err != nil { + return nil, trace.Wrap(err) + } + + l.consumerCloser = consumer + go consumer.run(consumerCtx) } @@ -546,6 +548,10 @@ func (l *Log) Close() error { return trace.Wrap(l.consumerCloser.Close()) } +func (l *Log) IsConsumerDisabled() bool { + return l.consumerCloser == nil +} + var isAlphanumericOrUnderscoreRe = regexp.MustCompile("^[a-zA-Z0-9_]+$") func isAlphanumericOrUnderscore(s string) bool { diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 9162d5b46c686..9613bc5bea479 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -530,6 +530,16 @@ func TestAthenaAuditLogSetup(t *testing.T) { require.True(t, ok, "invalid logger type, got %T", v) }, }, + { + name: "valid athena config with disabled consumer", + uris: []string{sampleAthenaURI + "&consumerDisabled=false"}, + externalAudit: externalAuditStorageDisabled, + wantFn: func(t *testing.T, alog events.AuditLogger) { + v, ok := alog.(*athena.Log) + require.True(t, ok, "invalid logger type, got %T", v) + require.True(t, v.IsConsumerDisabled(), "consumer is not disabled") + }, + }, { name: "config with rate limit - should use events.SearchEventsLimiter", uris: []string{sampleAthenaURI + "&limiterRefillAmount=3&limiterBurst=2"}, From 10ecd68c68ee833fac1b075683ca5f15a80520a3 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Fri, 20 Sep 2024 21:40:34 +0200 Subject: [PATCH 5/8] Fix URI in disabled consumer test --- lib/service/service_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 9613bc5bea479..a735e325d1e9a 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -532,7 +532,7 @@ func TestAthenaAuditLogSetup(t *testing.T) { }, { name: "valid athena config with disabled consumer", - uris: []string{sampleAthenaURI + "&consumerDisabled=false"}, + uris: []string{sampleAthenaURI + "&consumerDisabled=true"}, externalAudit: externalAuditStorageDisabled, wantFn: func(t *testing.T, alog events.AuditLogger) { v, ok := alog.(*athena.Log) From 9da8123802f36d51eda472995f7af79dca049ba2 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Tue, 1 Oct 2024 15:07:30 +0200 Subject: [PATCH 6/8] Address feedback about ConsumerLockName being a single string --- lib/events/athena/athena.go | 10 +++------- lib/events/athena/athena_test.go | 6 +++--- lib/events/athena/consumer.go | 8 ++++++-- lib/events/athena/consumer_test.go | 3 --- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 1edead1f52740..ecff846766325 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -123,9 +123,9 @@ type Config struct { // BatchMaxInterval defined interval at which parquet files will be created (optional). BatchMaxInterval time.Duration // ConsumerLockName defines a name of a SQS consumer lock (optional). - // If provided, it will be prefixed with "athena_" to avoid accidental + // If provided, it will be prefixed with "athena/" to avoid accidental // collision with existing locks. - ConsumerLockName []string + ConsumerLockName string // ConsumerDisabled defines if SQS consumer should be disabled (optional). ConsumerDisabled bool @@ -253,10 +253,6 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error { return trace.BadParameter("BatchMaxInterval too short, must be greater than 5s") } - if len(cfg.ConsumerLockName) == 0 { - cfg.ConsumerLockName = []string{"athena_lock"} - } - if cfg.LimiterRefillAmount < 0 { return trace.BadParameter("LimiterRefillAmount cannot be nagative") } @@ -425,7 +421,7 @@ func (cfg *Config) SetFromURL(url *url.URL) error { cfg.BatchMaxInterval = dur } if consumerLockName := url.Query().Get("consumerLockName"); consumerLockName != "" { - cfg.ConsumerLockName = []string{"athena", consumerLockName} + cfg.ConsumerLockName = consumerLockName } if val := url.Query().Get("consumerDisabled"); val != "" { boolVal, err := strconv.ParseBool(val) diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go index c07f4a0769dcc..baa4430db6a0d 100644 --- a/lib/events/athena/athena_test.go +++ b/lib/events/athena/athena_test.go @@ -96,7 +96,7 @@ func TestConfig_SetFromURL(t *testing.T) { QueueURL: "https://queueURL", BatchMaxItems: 1000, BatchMaxInterval: 10 * time.Second, - ConsumerLockName: []string{"athena", "mylock"}, + ConsumerLockName: "mylock", ConsumerDisabled: true, }, }, @@ -185,7 +185,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, - ConsumerLockName: []string{"athena_lock"}, + ConsumerLockName: "", PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, @@ -211,7 +211,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, - ConsumerLockName: []string{"athena_lock"}, + ConsumerLockName: "", PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index b5228608dfd4b..2b0417ec07a44 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -75,7 +75,7 @@ type consumer struct { storeLocationBucket string batchMaxItems int batchMaxInterval time.Duration - consumerLockName []string + consumerLockName string // perDateFileParquetWriter returns file writer per date. // Added in config to allow testing. @@ -253,10 +253,14 @@ func (c *consumer) runContinuouslyOnSingleAuth(ctx context.Context, eventsProces case <-ctx.Done(): return default: + lockName := []string{"athena", c.consumerLockName} + if c.consumerLockName == "" { + lockName = []string{"athena_lock"} + } err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{ LockConfiguration: backend.LockConfiguration{ Backend: c.backend, - LockNameComponents: c.consumerLockName, + LockNameComponents: lockName, // TTL is higher then batchMaxInterval because we want to optimize // for low backend writes. TTL: 5 * c.batchMaxInterval, diff --git a/lib/events/athena/consumer_test.go b/lib/events/athena/consumer_test.go index 25f57b8ddbc26..16831aac6ff7e 100644 --- a/lib/events/athena/consumer_test.go +++ b/lib/events/athena/consumer_test.go @@ -417,19 +417,16 @@ func TestConsumerRunContinuouslyOnSingleAuth(t *testing.T) { defer backend.Close() batchInterval := 20 * time.Millisecond - lockName := []string{"athena_lock"} c1 := consumer{ logger: log, backend: backend, batchMaxInterval: batchInterval, - consumerLockName: lockName, } c2 := consumer{ logger: log, backend: backend, batchMaxInterval: batchInterval, - consumerLockName: lockName, } m1 := mockEventsProcessor{interval: batchInterval} m2 := mockEventsProcessor{interval: batchInterval} From 7009d98339a3ae05b5e1c8bce790226901a162c0 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Tue, 1 Oct 2024 15:32:52 +0200 Subject: [PATCH 7/8] Update lib/events/athena/athena.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/events/athena/athena.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index ecff846766325..053e8afbb4acb 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -500,7 +500,6 @@ func New(ctx context.Context, cfg Config) (*Log, error) { l := &Log{ publisher: newPublisherFromAthenaConfig(cfg), querier: querier, - consumerCloser: nil, } if !cfg.ConsumerDisabled { From 19cf4cca8e4de3a0185e4a66985c35c9fe57c537 Mon Sep 17 00:00:00 2001 From: Leszek Charkiewicz Date: Tue, 1 Oct 2024 15:56:27 +0200 Subject: [PATCH 8/8] Make linter happy --- lib/events/athena/athena.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 053e8afbb4acb..5b6a30f6bab72 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -498,8 +498,8 @@ func New(ctx context.Context, cfg Config) (*Log, error) { } l := &Log{ - publisher: newPublisherFromAthenaConfig(cfg), - querier: querier, + publisher: newPublisherFromAthenaConfig(cfg), + querier: querier, } if !cfg.ConsumerDisabled {