From 69093ed8b0cb8af831f26461f8c19a95f85daaad Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Mon, 7 Apr 2025 22:43:04 +0200 Subject: [PATCH 1/2] Wait a full second between refreshShards calls --- lib/backend/dynamo/shards.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/backend/dynamo/shards.go b/lib/backend/dynamo/shards.go index 741501101971e..47ca659af1998 100644 --- a/lib/backend/dynamo/shards.go +++ b/lib/backend/dynamo/shards.go @@ -152,8 +152,8 @@ func (b *Backend) pollStreams(externalCtx context.Context) error { b.buf.SetInit() defer b.buf.Reset() - ticker := time.NewTicker(b.PollStreamPeriod) - defer ticker.Stop() + timer := time.NewTimer(b.PollStreamPeriod) + defer timer.Stop() for { select { @@ -174,10 +174,11 @@ func (b *Backend) pollStreams(externalCtx context.Context) error { } else { b.buf.Emit(event.events...) } - case <-ticker.C: + case <-timer.C: if err := refreshShards(false); err != nil { return trace.Wrap(err) } + timer.Reset(b.PollStreamPeriod) case <-ctx.Done(): b.logger.Log(ctx, logutils.TraceLevel, "Context is closing, returning.") return nil From a32fd02c0e0e21ed60cfccc2964af51fd4dd8f0f Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Mon, 7 Apr 2025 22:43:20 +0200 Subject: [PATCH 2/2] Wait 200ms between DescribeStream calls --- lib/backend/dynamo/shards.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/backend/dynamo/shards.go b/lib/backend/dynamo/shards.go index 47ca659af1998..15dd4445b73d8 100644 --- a/lib/backend/dynamo/shards.go +++ b/lib/backend/dynamo/shards.go @@ -282,6 +282,12 @@ func (b *Backend) collectActiveShards(ctx context.Context, streamArn *string) ([ return filterActiveShards(out), nil } input.ExclusiveStartShardId = streamInfo.StreamDescription.LastEvaluatedShardId + select { + case <-ctx.Done(): + // let the next call deal with the context error + case <-time.After(200 * time.Millisecond): + // 10 calls per second with two auths, with ample margin + } } }