Skip to content

Commit 04f5a52

Browse files
committed
Replacing retry goroutines with iterators
1 parent efd53f1 commit 04f5a52

File tree

5 files changed

+169
-42
lines changed

5 files changed

+169
-42
lines changed

api-bucket-notification.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,6 @@ func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefi
157157
return
158158
}
159159

160-
// Continuously run and listen on bucket notification.
161-
// Create a done channel to control 'ListObjects' go routine.
162-
retryDoneCh := make(chan struct{}, 1)
163-
164-
// Indicate to our routine to exit cleanly upon return.
165-
defer close(retryDoneCh)
166-
167160
// Prepare urlValues to pass into the request on every loop
168161
urlValues := make(url.Values)
169162
urlValues.Set("ping", "10")
@@ -172,7 +165,7 @@ func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefi
172165
urlValues["events"] = events
173166

174167
// Wait on the jitter retry loop.
175-
for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter, retryDoneCh) {
168+
for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter) {
176169
// Execute GET on bucket to list objects.
177170
resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
178171
bucketName: bucketName,

api.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -660,13 +660,7 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ
660660
metadata.trailer.Set(metadata.addCrc.Key(), base64.StdEncoding.EncodeToString(crc.Sum(nil)))
661661
}
662662

663-
// Create cancel context to control 'newRetryTimer' go routine.
664-
retryCtx, cancel := context.WithCancel(ctx)
665-
666-
// Indicate to our routine to exit cleanly upon return.
667-
defer cancel()
668-
669-
for range c.newRetryTimer(retryCtx, reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter) {
663+
for range c.newRetryTimer(ctx, reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter) {
670664
// Retry executes the following function body if request has an
671665
// error until maxRetries have been exhausted, retry attempts are
672666
// performed after waiting for a given period of time in a
@@ -779,7 +773,7 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ
779773
}
780774

781775
// Return an error when retry is canceled or deadlined
782-
if e := retryCtx.Err(); e != nil {
776+
if e := ctx.Err(); e != nil {
783777
return nil, e
784778
}
785779

retry-continous.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
package minio
1919

20-
import "time"
20+
import (
21+
"iter"
22+
"math"
23+
"time"
24+
)
2125

2226
// newRetryTimerContinous creates a timer with exponentially increasing delays forever.
23-
func (c *Client) newRetryTimerContinous(baseSleep, maxSleep time.Duration, jitter float64, doneCh chan struct{}) <-chan int {
24-
attemptCh := make(chan int)
25-
27+
func (c *Client) newRetryTimerContinous(baseSleep, maxSleep time.Duration, jitter float64) iter.Seq[int] {
2628
// normalize jitter to the range [0, 1.0]
2729
if jitter < NoJitter {
2830
jitter = NoJitter
@@ -44,26 +46,20 @@ func (c *Client) newRetryTimerContinous(baseSleep, maxSleep time.Duration, jitte
4446
if sleep > maxSleep {
4547
sleep = maxSleep
4648
}
47-
if jitter != NoJitter {
49+
if math.Abs(jitter-NoJitter) > 1e-9 {
4850
sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter)
4951
}
5052
return sleep
5153
}
5254

53-
go func() {
54-
defer close(attemptCh)
55+
return func(yield func(int) bool) {
5556
var nextBackoff int
5657
for {
57-
select {
58-
// Attempts starts.
59-
case attemptCh <- nextBackoff:
60-
nextBackoff++
61-
case <-doneCh:
62-
// Stop the routine.
58+
if !yield(nextBackoff) {
6359
return
6460
}
61+
nextBackoff++
6562
time.Sleep(exponentialBackoffWait(nextBackoff))
6663
}
67-
}()
68-
return attemptCh
64+
}
6965
}

retry.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"context"
2222
"crypto/x509"
2323
"errors"
24+
"iter"
25+
"math"
2426
"net/http"
2527
"net/url"
2628
"time"
@@ -45,8 +47,7 @@ var DefaultRetryCap = time.Second
4547

4648
// newRetryTimer creates a timer with exponentially increasing
4749
// delays until the maximum retry attempts are reached.
48-
func (c *Client) newRetryTimer(ctx context.Context, maxRetry int, baseSleep, maxSleep time.Duration, jitter float64) <-chan int {
49-
attemptCh := make(chan int)
50+
func (c *Client) newRetryTimer(ctx context.Context, maxRetry int, baseSleep, maxSleep time.Duration, jitter float64) iter.Seq[int] {
5051

5152
// computes the exponential backoff duration according to
5253
// https://www.awsarchitectureblog.com/2015/03/backoff.html
@@ -64,18 +65,22 @@ func (c *Client) newRetryTimer(ctx context.Context, maxRetry int, baseSleep, max
6465
if sleep > maxSleep {
6566
sleep = maxSleep
6667
}
67-
if jitter != NoJitter {
68+
if math.Abs(jitter-NoJitter) > 1e-9 {
6869
sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter)
6970
}
7071
return sleep
7172
}
7273

73-
go func() {
74-
defer close(attemptCh)
75-
for i := 0; i < maxRetry; i++ {
76-
select {
77-
case attemptCh <- i + 1:
78-
case <-ctx.Done():
74+
return func(yield func(int) bool) {
75+
// if context is already cancelled, skip yield
76+
select {
77+
case <-ctx.Done():
78+
return
79+
default:
80+
}
81+
82+
for i := range maxRetry {
83+
if !yield(i) {
7984
return
8085
}
8186

@@ -85,8 +90,7 @@ func (c *Client) newRetryTimer(ctx context.Context, maxRetry int, baseSleep, max
8590
return
8691
}
8792
}
88-
}()
89-
return attemptCh
93+
}
9094
}
9195

9296
// List of AWS S3 error codes which are retryable.

retry_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package minio
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestRetryTimer(t *testing.T) {
11+
12+
t.Run("withLimit", func(t *testing.T) {
13+
t.Parallel()
14+
c := &Client{random: rand.New(rand.NewSource(42))}
15+
ctx := context.Background()
16+
var count int
17+
for range c.newRetryTimer(ctx, 3, time.Millisecond, 10*time.Millisecond, 0.0) {
18+
count++
19+
}
20+
if count != 3 {
21+
t.Errorf("expected exactly 3 yields")
22+
}
23+
})
24+
25+
t.Run("checkDelay", func(t *testing.T) {
26+
t.Parallel()
27+
c := &Client{random: rand.New(rand.NewSource(42))}
28+
ctx := context.Background()
29+
prev := time.Now()
30+
baseSleep := time.Millisecond
31+
maxSleep := 10 * time.Millisecond
32+
for i := range c.newRetryTimer(ctx, 6, baseSleep, maxSleep, 0.0) {
33+
if i == 0 {
34+
// there is no sleep for the first execution
35+
if time.Since(prev) >= time.Millisecond {
36+
t.Errorf("expected to not sleep for the first instance of the loop")
37+
}
38+
prev = time.Now()
39+
continue
40+
}
41+
expect := baseSleep * time.Duration(1<<uint(i-1))
42+
expect = min(expect, maxSleep)
43+
if d := time.Since(prev); d < expect || d > 2*maxSleep {
44+
t.Errorf("expected to sleep for at least %s", expect.String())
45+
}
46+
prev = time.Now()
47+
}
48+
})
49+
50+
t.Run("withBreak", func(t *testing.T) {
51+
t.Parallel()
52+
c := &Client{random: rand.New(rand.NewSource(42))}
53+
ctx := context.Background()
54+
var count int
55+
for range c.newRetryTimer(ctx, 10, time.Millisecond, 10*time.Millisecond, 0.5) {
56+
count++
57+
if count >= 3 {
58+
break
59+
}
60+
}
61+
if count != 3 {
62+
t.Errorf("expected exactly 3 yields")
63+
}
64+
})
65+
66+
t.Run("withCancelledContext", func(t *testing.T) {
67+
t.Parallel()
68+
c := &Client{random: rand.New(rand.NewSource(42))}
69+
ctx := context.Background()
70+
ctx, cancel := context.WithCancel(ctx)
71+
cancel()
72+
var count int
73+
for range c.newRetryTimer(ctx, 10, time.Millisecond, 10*time.Millisecond, 0.5) {
74+
count++
75+
}
76+
if count != 0 {
77+
t.Errorf("expected no yields")
78+
}
79+
})
80+
t.Run("whileCancelledContext", func(t *testing.T) {
81+
t.Parallel()
82+
c := &Client{random: rand.New(rand.NewSource(42))}
83+
ctx := context.Background()
84+
ctx, cancel := context.WithCancel(ctx)
85+
var count int
86+
for range c.newRetryTimer(ctx, 10, time.Millisecond, 10*time.Millisecond, 0.5) {
87+
count++
88+
cancel()
89+
}
90+
cancel()
91+
if count != 1 {
92+
t.Errorf("expected only one yield")
93+
}
94+
})
95+
}
96+
97+
func TestRetryContinuous(t *testing.T) {
98+
99+
t.Run("checkDelay", func(t *testing.T) {
100+
t.Parallel()
101+
c := &Client{random: rand.New(rand.NewSource(42))}
102+
prev := time.Now()
103+
baseSleep := time.Millisecond
104+
maxSleep := 10 * time.Millisecond
105+
for i := range c.newRetryTimerContinous(time.Millisecond, 10*time.Millisecond, 0.0) {
106+
if i == 0 {
107+
// there is no sleep for the first execution
108+
if time.Since(prev) >= time.Millisecond {
109+
t.Errorf("expected to not sleep for the first instance of the loop")
110+
}
111+
prev = time.Now()
112+
continue
113+
}
114+
expect := baseSleep * time.Duration(1<<uint(i-1))
115+
expect = min(expect, maxSleep)
116+
if d := time.Since(prev); d < expect || d > 2*maxSleep {
117+
t.Errorf("expected to sleep for at least %s", expect.String())
118+
}
119+
prev = time.Now()
120+
if i >= 10 {
121+
break
122+
}
123+
}
124+
})
125+
126+
t.Run("withBreak", func(t *testing.T) {
127+
t.Parallel()
128+
c := &Client{random: rand.New(rand.NewSource(42))}
129+
var count int
130+
for range c.newRetryTimerContinous(time.Millisecond, 10*time.Millisecond, 0.5) {
131+
count++
132+
if count >= 3 {
133+
break
134+
}
135+
}
136+
if count != 3 {
137+
t.Errorf("expected exactly 3 yields")
138+
}
139+
})
140+
}

0 commit comments

Comments
 (0)