Skip to content

Commit 85b7837

Browse files
p0lyn0mialbertinatto
authored andcommitted
UPSTREAM: <carry>: retry etcd Unavailable errors
This commit renews openshift#327 What has changed compared to the original PR is: - The retryClient interface has been adapted to storage.Interface. - The isRetriableEtcdError method has been completely changed; it seems that previously the error we wanted to retry was not being retried. Even the unit tests were failing. Overall, I still think this is not the correct fix. The proper fix should be added to the etcd client. UPSTREAM: <carry>: retry etcd Unavailable errors This is the second commit for the retry logic. This commit adds unit tests and slightly improves the logging. During a rebase squash with the previous one. UPSTREAM: <carry>: retry_etcdclient: expose retry logic functionality during rebase merge with: UPSTREAM: <carry>: retry etcd Unavailable errors UPSTREAM: <carry>: Don't retry storage calls with side effects. The existing patch retried any etcd error returned from storage with the code "Unavailable". Writes can only be safely retried if the client can be absolutely sure that the initial attempt ended before persisting any changes. The "Unavailable" code includes errors like "timed out" that can't be safely retried for writes. UPSTREAM: <carry>: Add retries for GetCurrentResourceVersion. UPSTREAM: <carry>: squash: storage interface underlying the retryClient has changed Removed methods: - Count Added methods: - Stats - SetKeysFunc - CompactRevision
1 parent 35c9e69 commit 85b7837

File tree

4 files changed

+496
-2
lines changed

4 files changed

+496
-2
lines changed
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
package etcd3retry
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"regexp"
7+
"strings"
8+
"time"
9+
10+
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
11+
"google.golang.org/grpc/codes"
12+
13+
"k8s.io/apimachinery/pkg/runtime"
14+
"k8s.io/apimachinery/pkg/util/wait"
15+
"k8s.io/apimachinery/pkg/watch"
16+
"k8s.io/apiserver/pkg/storage"
17+
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
18+
"k8s.io/klog/v2"
19+
)
20+
21+
var DefaultRetry = wait.Backoff{
22+
Duration: 300 * time.Millisecond,
23+
Factor: 2, // double the timeout for every failure
24+
Jitter: 0.1,
25+
Steps: 6, // .3 + .6 + 1.2 + 2.4 + 4.8 = 10ish this lets us smooth out short bumps but not long ones and keeps retry behavior closer.
26+
}
27+
28+
type retryClient struct {
29+
// All methods of storage.Interface are implemented directly on *retryClient, even when they
30+
// are purely passthroughs to the delegate. During a rebase, consider whether or not it is
31+
// safe and appropriate for a new method added to the method set of storage.Interface to
32+
// perform retries.
33+
delegate storage.Interface
34+
}
35+
36+
func (c *retryClient) Stats(ctx context.Context) (storage.Stats, error) {
37+
return c.delegate.Stats(ctx)
38+
}
39+
40+
func (c *retryClient) SetKeysFunc(f storage.KeysFunc) {
41+
c.delegate.SetKeysFunc(f)
42+
}
43+
44+
func (c *retryClient) CompactRevision() int64 {
45+
return c.delegate.CompactRevision()
46+
}
47+
48+
func (c *retryClient) ReadinessCheck() error {
49+
return c.delegate.ReadinessCheck()
50+
}
51+
52+
func (c *retryClient) RequestWatchProgress(ctx context.Context) error {
53+
return c.delegate.RequestWatchProgress(ctx)
54+
}
55+
56+
func (c *retryClient) Versioner() storage.Versioner {
57+
return c.delegate.Versioner()
58+
}
59+
60+
// New returns an etcd3 implementation of storage.Interface.
61+
func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface {
62+
return &retryClient{delegate: delegate}
63+
}
64+
65+
func (c *retryClient) GetCurrentResourceVersion(ctx context.Context) (uint64, error) {
66+
var (
67+
rv uint64
68+
err error
69+
)
70+
return rv, OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error {
71+
rv, err = c.delegate.GetCurrentResourceVersion(ctx)
72+
return err
73+
})
74+
}
75+
76+
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
77+
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
78+
// set to the read value from database.
79+
func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
80+
return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error {
81+
return c.delegate.Create(ctx, key, obj, out, ttl)
82+
})
83+
}
84+
85+
// Delete removes the specified key and returns the value that existed at that spot.
86+
// If key didn't exist, it will return NotFound storage error.
87+
func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
88+
return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error {
89+
return c.delegate.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject, opts)
90+
})
91+
}
92+
93+
// Watch begins watching the specified key. Events are decoded into API objects,
94+
// and any items selected by 'p' are sent down to returned watch.Interface.
95+
// resourceVersion may be used to specify what version to begin watching,
96+
// which should be the current resourceVersion, and no longer rv+1
97+
// (e.g. reconnecting without missing any updates).
98+
// If resource version is "0", this interface will get current object at given key
99+
// and send it in an "ADDED" event, before watch starts.
100+
func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
101+
var ret watch.Interface
102+
err := OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error {
103+
var innerErr error
104+
ret, innerErr = c.delegate.Watch(ctx, key, opts)
105+
return innerErr
106+
})
107+
return ret, err
108+
}
109+
110+
// Get unmarshals json found at key into objPtr. On a not found error, will either
111+
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
112+
// Treats empty responses and nil response nodes exactly like a not found error.
113+
// The returned contents may be delayed, but it is guaranteed that they will
114+
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
115+
func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
116+
return OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error {
117+
return c.delegate.Get(ctx, key, opts, objPtr)
118+
})
119+
}
120+
121+
// GetList unmarshalls objects found at key into a *List api object (an object
122+
// that satisfies runtime.IsList definition).
123+
// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive'
124+
// is true, 'key' is used as a prefix.
125+
// The returned contents may be delayed, but it is guaranteed that they will
126+
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
127+
func (c *retryClient) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
128+
return OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error {
129+
return c.delegate.GetList(ctx, key, opts, listObj)
130+
})
131+
}
132+
133+
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination')
134+
// retrying the update until success if there is index conflict.
135+
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
136+
// other writers are simultaneously updating it, so tryUpdate() needs to take into account
137+
// the current contents of the object when deciding how the update object should look.
138+
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
139+
// else `destination` will be set to the zero value of it's type.
140+
// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized
141+
// contents as the input, it won't perform any update, but instead set `destination` to an object with those
142+
// contents.
143+
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
144+
// current version of the object to avoid read operation from storage to get it.
145+
// However, the implementations have to retry in case suggestion is stale.
146+
//
147+
// Example:
148+
//
149+
// s := /* implementation of Interface */
150+
// err := s.GuaranteedUpdate(
151+
//
152+
// "myKey", &MyType{}, true, preconditions,
153+
// func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
154+
// // Before each invocation of the user defined function, "input" is reset to
155+
// // current contents for "myKey" in database.
156+
// curr := input.(*MyType) // Guaranteed to succeed.
157+
//
158+
// // Make the modification
159+
// curr.Counter++
160+
//
161+
// // Return the modified object - return an error to stop iterating. Return
162+
// // a uint64 to alter the TTL on the object, or nil to keep it the same value.
163+
// return cur, nil, nil
164+
// }, cachedExistingObject
165+
//
166+
// )
167+
func (c *retryClient) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
168+
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
169+
return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error {
170+
return c.delegate.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
171+
})
172+
}
173+
174+
// These errors are coming back from the k8s.io/apiserver storage.Interface, not directly from an
175+
// etcd client. Classifying them can be fragile since the storage methods may not return etcd client
176+
// errors directly.
177+
var errorLabelsBySuffix = map[string]string{
178+
"etcdserver: leader changed": "LeaderChanged",
179+
"etcdserver: no leader": "NoLeader",
180+
"raft proposal dropped": "ProposalDropped",
181+
182+
"etcdserver: request timed out": "Timeout",
183+
"etcdserver: request timed out, possibly due to previous leader failure": "Timeout",
184+
"etcdserver: request timed out, possible due to connection lost": "Timeout",
185+
"etcdserver: request timed out, waiting for the applied index took too long": "Timeout",
186+
"etcdserver: server stopped": "Stopped",
187+
}
188+
189+
var retriableWriteErrorSuffixes = func() *regexp.Regexp {
190+
// This list should include only errors the caller is certain have no side effects.
191+
suffixes := []string{
192+
"etcdserver: leader changed",
193+
"etcdserver: no leader",
194+
"raft proposal dropped",
195+
}
196+
return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`)))
197+
}()
198+
199+
// IsRetriableErrorOnWrite returns true if and only if a retry should be attempted when the provided
200+
// error is returned from a write attempt. If the error is retriable, a non-empty string classifying
201+
// the error is also returned.
202+
func IsRetriableErrorOnWrite(err error) (string, bool) {
203+
if suffix := retriableWriteErrorSuffixes.FindString(err.Error()); suffix != "" {
204+
return errorLabelsBySuffix[suffix], true
205+
}
206+
return "", false
207+
}
208+
209+
var retriableReadErrorSuffixes = func() *regexp.Regexp {
210+
var suffixes []string
211+
for suffix := range errorLabelsBySuffix {
212+
suffixes = append(suffixes, suffix)
213+
}
214+
return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`)))
215+
}()
216+
217+
// IsRetriableErrorOnRead returns true if and only if a retry should be attempted when the provided
218+
// error is returned from a read attempt. If the error is retriable, a non-empty string classifying
219+
// the error is also returned.
220+
func IsRetriableErrorOnRead(err error) (string, bool) {
221+
if suffix := retriableReadErrorSuffixes.FindString(err.Error()); suffix != "" {
222+
return errorLabelsBySuffix[suffix], true
223+
}
224+
if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok && etcdError.Code() == codes.Unavailable {
225+
return "Unavailable", true
226+
}
227+
return "", false
228+
}
229+
230+
// OnError allows the caller to retry fn in case the error returned by fn is retriable
231+
// according to the provided function. backoff defines the maximum retries and the wait
232+
// interval between two retries.
233+
func OnError(ctx context.Context, backoff wait.Backoff, retriable func(error) (string, bool), fn func() error) error {
234+
var lastErr error
235+
var lastErrLabel string
236+
var retry bool
237+
var retryCounter int
238+
err := backoffWithRequestContext(ctx, backoff, func() (bool, error) {
239+
err := fn()
240+
if retry {
241+
klog.V(1).Infof("etcd retry - counter: %v, lastErrLabel: %s lastError: %v, error: %v", retryCounter, lastErrLabel, lastErr, err)
242+
metrics.UpdateEtcdRequestRetry(lastErrLabel)
243+
}
244+
if err == nil {
245+
return true, nil
246+
}
247+
248+
lastErrLabel, retry = retriable(err)
249+
if klog.V(6).Enabled() {
250+
klog.V(6).InfoS("observed storage error", "err", err, "retriable", retry)
251+
}
252+
if retry {
253+
lastErr = err
254+
retryCounter++
255+
return false, nil
256+
}
257+
258+
return false, err
259+
})
260+
if err == wait.ErrWaitTimeout && lastErr != nil {
261+
err = lastErr
262+
}
263+
return err
264+
}
265+
266+
// backoffWithRequestContext works with a request context and a Backoff. It ensures that the retry wait never
267+
// exceeds the deadline specified by the request context.
268+
func backoffWithRequestContext(ctx context.Context, backoff wait.Backoff, condition wait.ConditionFunc) error {
269+
for backoff.Steps > 0 {
270+
if ok, err := condition(); err != nil || ok {
271+
return err
272+
}
273+
274+
if backoff.Steps == 1 {
275+
break
276+
}
277+
278+
waitBeforeRetry := backoff.Step()
279+
select {
280+
case <-ctx.Done():
281+
return ctx.Err()
282+
case <-time.After(waitBeforeRetry):
283+
}
284+
}
285+
286+
return wait.ErrWaitTimeout
287+
}

0 commit comments

Comments
 (0)