diff --git a/lib/backend/dynamo/shards.go b/lib/backend/dynamo/shards.go index 52641b6208dde..54c972274b892 100644 --- a/lib/backend/dynamo/shards.go +++ b/lib/backend/dynamo/shards.go @@ -150,8 +150,8 @@ func (b *Backend) pollStreams(externalCtx context.Context) error { b.buf.SetInit() defer b.buf.Reset() - ticker := time.NewTicker(b.PollStreamPeriod) - defer ticker.Stop() + timer := time.NewTimer(b.PollStreamPeriod) + defer timer.Stop() for { select { @@ -172,10 +172,11 @@ func (b *Backend) pollStreams(externalCtx context.Context) error { } else { b.buf.Emit(event.events...) } - case <-ticker.C: + case <-timer.C: if err := refreshShards(false); err != nil { return trace.Wrap(err) } + timer.Reset(b.PollStreamPeriod) case <-ctx.Done(): b.Tracef("Context is closing, returning.") return nil @@ -279,6 +280,12 @@ func (b *Backend) collectActiveShards(ctx context.Context, streamArn *string) ([ return filterActiveShards(out), nil } input.ExclusiveStartShardId = streamInfo.StreamDescription.LastEvaluatedShardId + select { + case <-ctx.Done(): + // let the next call deal with the context error + case <-time.After(200 * time.Millisecond): + // 10 calls per second with two auths, with ample margin + } } }