Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions lib/events/athena/athena.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ type Config struct {
BatchMaxItems int
// BatchMaxInterval defined interval at which parquet files will be created (optional).
BatchMaxInterval time.Duration
// ConsumerLockName defines a name of a SQS consumer lock (optional).
// If provided, it will be prefixed with "athena/" to avoid accidental
// collision with existing locks.
ConsumerLockName string
// ConsumerDisabled defines if SQS consumer should be disabled (optional).
ConsumerDisabled bool

// Clock is a clock interface, used in tests.
Clock clockwork.Clock
Expand Down Expand Up @@ -417,6 +423,16 @@ func (cfg *Config) SetFromURL(url *url.URL) error {
}
cfg.BatchMaxInterval = dur
}
if consumerLockName := url.Query().Get("consumerLockName"); consumerLockName != "" {
cfg.ConsumerLockName = consumerLockName
}
if val := url.Query().Get("consumerDisabled"); val != "" {
boolVal, err := strconv.ParseBool(val)
if err != nil {
return trace.BadParameter("invalid consumerDisabled value: %v", err)
}
cfg.ConsumerDisabled = boolVal
}

return nil
}
Expand Down Expand Up @@ -484,20 +500,23 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
return nil, trace.Wrap(err)
}

consumerCtx, consumerCancel := context.WithCancel(ctx)

consumer, err := newConsumer(cfg, consumerCancel)
if err != nil {
return nil, trace.Wrap(err)
}

l := &Log{
publisher: newPublisherFromAthenaConfig(cfg),
querier: querier,
consumerCloser: consumer,
publisher: newPublisherFromAthenaConfig(cfg),
querier: querier,
}

go consumer.run(consumerCtx)
if !cfg.ConsumerDisabled {
consumerCtx, consumerCancel := context.WithCancel(ctx)

consumer, err := newConsumer(cfg, consumerCancel)
if err != nil {
return nil, trace.Wrap(err)
}

l.consumerCloser = consumer

go consumer.run(consumerCtx)
}

return l, nil
}
Expand Down Expand Up @@ -527,6 +546,10 @@ func (l *Log) Close() error {
return trace.Wrap(l.consumerCloser.Close())
}

func (l *Log) IsConsumerDisabled() bool {
return l.consumerCloser == nil
}

var isAlphanumericOrUnderscoreRe = regexp.MustCompile("^[a-zA-Z0-9_]+$")

func isAlphanumericOrUnderscore(s string) bool {
Expand Down
6 changes: 5 additions & 1 deletion lib/events/athena/athena_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,15 @@ func TestConfig_SetFromURL(t *testing.T) {
},
{
name: "params to batcher",
url: "athena://db.tbl/?queueURL=https://queueURL&batchMaxItems=1000&batchMaxInterval=10s",
url: "athena://db.tbl/?queueURL=https://queueURL&batchMaxItems=1000&batchMaxInterval=10s&consumerLockName=mylock&consumerDisabled=true",
want: Config{
TableName: "tbl",
Database: "db",
QueueURL: "https://queueURL",
BatchMaxItems: 1000,
BatchMaxInterval: 10 * time.Second,
ConsumerLockName: "mylock",
ConsumerDisabled: true,
},
},
{
Expand Down Expand Up @@ -187,6 +189,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
GetQueryResultsInterval: 100 * time.Millisecond,
BatchMaxItems: 20000,
BatchMaxInterval: 1 * time.Minute,
ConsumerLockName: "",
PublisherConsumerAWSConfig: dummyAWSCfg,
StorerQuerierAWSConfig: dummyAWSCfg,
Backend: mockBackend{},
Expand All @@ -212,6 +215,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
GetQueryResultsInterval: 100 * time.Millisecond,
BatchMaxItems: 20000,
BatchMaxInterval: 1 * time.Minute,
ConsumerLockName: "",
PublisherConsumerAWSConfig: dummyAWSCfg,
StorerQuerierAWSConfig: dummyAWSCfg,
Backend: mockBackend{},
Expand Down
8 changes: 7 additions & 1 deletion lib/events/athena/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type consumer struct {
storeLocationBucket string
batchMaxItems int
batchMaxInterval time.Duration
consumerLockName string

// perDateFileParquetWriter returns file writer per date.
// Added in config to allow testing.
Expand Down Expand Up @@ -157,6 +158,7 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) {
storeLocationBucket: cfg.locationS3Bucket,
batchMaxItems: cfg.BatchMaxItems,
batchMaxInterval: cfg.BatchMaxInterval,
consumerLockName: cfg.ConsumerLockName,
collectConfig: collectCfg,
sqsDeleter: sqsClient,
queueURL: cfg.QueueURL,
Expand Down Expand Up @@ -255,10 +257,14 @@ func (c *consumer) runContinuouslyOnSingleAuth(ctx context.Context, eventsProces
case <-ctx.Done():
return
default:
lockName := []string{"athena", c.consumerLockName}
if c.consumerLockName == "" {
lockName = []string{"athena_lock"}
}
err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{
LockConfiguration: backend.LockConfiguration{
Backend: c.backend,
LockNameComponents: []string{"athena_lock"},
LockNameComponents: lockName,
// TTL is higher then batchMaxInterval because we want to optimize
// for low backend writes.
TTL: 5 * c.batchMaxInterval,
Expand Down
10 changes: 10 additions & 0 deletions lib/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,16 @@ func TestAthenaAuditLogSetup(t *testing.T) {
require.True(t, ok, "invalid logger type, got %T", v)
},
},
{
name: "valid athena config with disabled consumer",
uris: []string{sampleAthenaURI + "&consumerDisabled=true"},
externalAudit: externalAuditStorageDisabled,
wantFn: func(t *testing.T, alog events.AuditLogger) {
v, ok := alog.(*athena.Log)
require.True(t, ok, "invalid logger type, got %T", v)
require.True(t, v.IsConsumerDisabled(), "consumer is not disabled")
},
},
{
name: "config with rate limit - should use events.SearchEventsLimiter",
uris: []string{sampleAthenaURI + "&limiterRefillAmount=3&limiterBurst=2"},
Expand Down