diff --git a/lib/backend/dynamo/shards.go b/lib/backend/dynamo/shards.go index 741501101971e..15dd4445b73d8 100644 --- a/lib/backend/dynamo/shards.go +++ b/lib/backend/dynamo/shards.go @@ -152,8 +152,8 @@ func (b *Backend) pollStreams(externalCtx context.Context) error { b.buf.SetInit() defer b.buf.Reset() - ticker := time.NewTicker(b.PollStreamPeriod) - defer ticker.Stop() + timer := time.NewTimer(b.PollStreamPeriod) + defer timer.Stop() for { select { @@ -174,10 +174,11 @@ func (b *Backend) pollStreams(externalCtx context.Context) error { } else { b.buf.Emit(event.events...) } - case <-ticker.C: + case <-timer.C: if err := refreshShards(false); err != nil { return trace.Wrap(err) } + timer.Reset(b.PollStreamPeriod) case <-ctx.Done(): b.logger.Log(ctx, logutils.TraceLevel, "Context is closing, returning.") return nil @@ -281,6 +282,12 @@ func (b *Backend) collectActiveShards(ctx context.Context, streamArn *string) ([ return filterActiveShards(out), nil } input.ExclusiveStartShardId = streamInfo.StreamDescription.LastEvaluatedShardId + select { + case <-ctx.Done(): + // let the next call deal with the context error + case <-time.After(200 * time.Millisecond): + // 10 calls per second with two auths, with ample margin + } } }