|
| 1 | +package etcd3retry |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "regexp" |
| 7 | + "strings" |
| 8 | + "time" |
| 9 | + |
| 10 | + etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" |
| 11 | + "google.golang.org/grpc/codes" |
| 12 | + |
| 13 | + "k8s.io/apimachinery/pkg/runtime" |
| 14 | + "k8s.io/apimachinery/pkg/util/wait" |
| 15 | + "k8s.io/apimachinery/pkg/watch" |
| 16 | + "k8s.io/apiserver/pkg/storage" |
| 17 | + "k8s.io/apiserver/pkg/storage/etcd3/metrics" |
| 18 | + "k8s.io/klog/v2" |
| 19 | +) |
| 20 | + |
| 21 | +var DefaultRetry = wait.Backoff{ |
| 22 | + Duration: 300 * time.Millisecond, |
| 23 | + Factor: 2, // double the timeout for every failure |
| 24 | + Jitter: 0.1, |
| 25 | + Steps: 6, // .3 + .6 + 1.2 + 2.4 + 4.8 = 10ish this lets us smooth out short bumps but not long ones and keeps retry behavior closer. |
| 26 | +} |
| 27 | + |
| 28 | +type retryClient struct { |
| 29 | + // All methods of storage.Interface are implemented directly on *retryClient, even when they |
| 30 | + // are purely passthroughs to the delegate. During a rebase, consider whether or not it is |
| 31 | + // safe and appropriate for a new method added to the method set of storage.Interface to |
| 32 | + // perform retries. |
| 33 | + delegate storage.Interface |
| 34 | +} |
| 35 | + |
| 36 | +func (c *retryClient) Stats(ctx context.Context) (storage.Stats, error) { |
| 37 | + return c.delegate.Stats(ctx) |
| 38 | +} |
| 39 | + |
| 40 | +func (c *retryClient) SetKeysFunc(f storage.KeysFunc) { |
| 41 | + c.delegate.SetKeysFunc(f) |
| 42 | +} |
| 43 | + |
| 44 | +func (c *retryClient) CompactRevision() int64 { |
| 45 | + return c.delegate.CompactRevision() |
| 46 | +} |
| 47 | + |
| 48 | +func (c *retryClient) ReadinessCheck() error { |
| 49 | + return c.delegate.ReadinessCheck() |
| 50 | +} |
| 51 | + |
| 52 | +func (c *retryClient) RequestWatchProgress(ctx context.Context) error { |
| 53 | + return c.delegate.RequestWatchProgress(ctx) |
| 54 | +} |
| 55 | + |
| 56 | +func (c *retryClient) Versioner() storage.Versioner { |
| 57 | + return c.delegate.Versioner() |
| 58 | +} |
| 59 | + |
| 60 | +// New returns an etcd3 implementation of storage.Interface. |
| 61 | +func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface { |
| 62 | + return &retryClient{delegate: delegate} |
| 63 | +} |
| 64 | + |
| 65 | +func (c *retryClient) GetCurrentResourceVersion(ctx context.Context) (uint64, error) { |
| 66 | + var ( |
| 67 | + rv uint64 |
| 68 | + err error |
| 69 | + ) |
| 70 | + return rv, OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error { |
| 71 | + rv, err = c.delegate.GetCurrentResourceVersion(ctx) |
| 72 | + return err |
| 73 | + }) |
| 74 | +} |
| 75 | + |
| 76 | +// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live |
| 77 | +// in seconds (0 means forever). If no error is returned and out is not nil, out will be |
| 78 | +// set to the read value from database. |
| 79 | +func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { |
| 80 | + return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error { |
| 81 | + return c.delegate.Create(ctx, key, obj, out, ttl) |
| 82 | + }) |
| 83 | +} |
| 84 | + |
| 85 | +// Delete removes the specified key and returns the value that existed at that spot. |
| 86 | +// If key didn't exist, it will return NotFound storage error. |
| 87 | +func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error { |
| 88 | + return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error { |
| 89 | + return c.delegate.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject, opts) |
| 90 | + }) |
| 91 | +} |
| 92 | + |
| 93 | +// Watch begins watching the specified key. Events are decoded into API objects, |
| 94 | +// and any items selected by 'p' are sent down to returned watch.Interface. |
| 95 | +// resourceVersion may be used to specify what version to begin watching, |
| 96 | +// which should be the current resourceVersion, and no longer rv+1 |
| 97 | +// (e.g. reconnecting without missing any updates). |
| 98 | +// If resource version is "0", this interface will get current object at given key |
| 99 | +// and send it in an "ADDED" event, before watch starts. |
| 100 | +func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { |
| 101 | + var ret watch.Interface |
| 102 | + err := OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error { |
| 103 | + var innerErr error |
| 104 | + ret, innerErr = c.delegate.Watch(ctx, key, opts) |
| 105 | + return innerErr |
| 106 | + }) |
| 107 | + return ret, err |
| 108 | +} |
| 109 | + |
| 110 | +// Get unmarshals json found at key into objPtr. On a not found error, will either |
| 111 | +// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'. |
| 112 | +// Treats empty responses and nil response nodes exactly like a not found error. |
| 113 | +// The returned contents may be delayed, but it is guaranteed that they will |
| 114 | +// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'. |
| 115 | +func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { |
| 116 | + return OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error { |
| 117 | + return c.delegate.Get(ctx, key, opts, objPtr) |
| 118 | + }) |
| 119 | +} |
| 120 | + |
| 121 | +// GetList unmarshalls objects found at key into a *List api object (an object |
| 122 | +// that satisfies runtime.IsList definition). |
| 123 | +// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive' |
| 124 | +// is true, 'key' is used as a prefix. |
| 125 | +// The returned contents may be delayed, but it is guaranteed that they will |
| 126 | +// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'. |
| 127 | +func (c *retryClient) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { |
| 128 | + return OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error { |
| 129 | + return c.delegate.GetList(ctx, key, opts, listObj) |
| 130 | + }) |
| 131 | +} |
| 132 | + |
| 133 | +// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination') |
| 134 | +// retrying the update until success if there is index conflict. |
| 135 | +// Note that object passed to tryUpdate may change across invocations of tryUpdate() if |
| 136 | +// other writers are simultaneously updating it, so tryUpdate() needs to take into account |
| 137 | +// the current contents of the object when deciding how the update object should look. |
| 138 | +// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false |
| 139 | +// else `destination` will be set to the zero value of it's type. |
| 140 | +// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized |
| 141 | +// contents as the input, it won't perform any update, but instead set `destination` to an object with those |
| 142 | +// contents. |
| 143 | +// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the |
| 144 | +// current version of the object to avoid read operation from storage to get it. |
| 145 | +// However, the implementations have to retry in case suggestion is stale. |
| 146 | +// |
| 147 | +// Example: |
| 148 | +// |
| 149 | +// s := /* implementation of Interface */ |
| 150 | +// err := s.GuaranteedUpdate( |
| 151 | +// |
| 152 | +// "myKey", &MyType{}, true, preconditions, |
| 153 | +// func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { |
| 154 | +// // Before each invocation of the user defined function, "input" is reset to |
| 155 | +// // current contents for "myKey" in database. |
| 156 | +// curr := input.(*MyType) // Guaranteed to succeed. |
| 157 | +// |
| 158 | +// // Make the modification |
| 159 | +// curr.Counter++ |
| 160 | +// |
| 161 | +// // Return the modified object - return an error to stop iterating. Return |
| 162 | +// // a uint64 to alter the TTL on the object, or nil to keep it the same value. |
| 163 | +// return cur, nil, nil |
| 164 | +// }, cachedExistingObject |
| 165 | +// |
| 166 | +// ) |
| 167 | +func (c *retryClient) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, |
| 168 | + preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { |
| 169 | + return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error { |
| 170 | + return c.delegate.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject) |
| 171 | + }) |
| 172 | +} |
| 173 | + |
| 174 | +// These errors are coming back from the k8s.io/apiserver storage.Interface, not directly from an |
| 175 | +// etcd client. Classifying them can be fragile since the storage methods may not return etcd client |
| 176 | +// errors directly. |
| 177 | +var errorLabelsBySuffix = map[string]string{ |
| 178 | + "etcdserver: leader changed": "LeaderChanged", |
| 179 | + "etcdserver: no leader": "NoLeader", |
| 180 | + "raft proposal dropped": "ProposalDropped", |
| 181 | + |
| 182 | + "etcdserver: request timed out": "Timeout", |
| 183 | + "etcdserver: request timed out, possibly due to previous leader failure": "Timeout", |
| 184 | + "etcdserver: request timed out, possible due to connection lost": "Timeout", |
| 185 | + "etcdserver: request timed out, waiting for the applied index took too long": "Timeout", |
| 186 | + "etcdserver: server stopped": "Stopped", |
| 187 | +} |
| 188 | + |
| 189 | +var retriableWriteErrorSuffixes = func() *regexp.Regexp { |
| 190 | + // This list should include only errors the caller is certain have no side effects. |
| 191 | + suffixes := []string{ |
| 192 | + "etcdserver: leader changed", |
| 193 | + "etcdserver: no leader", |
| 194 | + "raft proposal dropped", |
| 195 | + } |
| 196 | + return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`))) |
| 197 | +}() |
| 198 | + |
| 199 | +// IsRetriableErrorOnWrite returns true if and only if a retry should be attempted when the provided |
| 200 | +// error is returned from a write attempt. If the error is retriable, a non-empty string classifying |
| 201 | +// the error is also returned. |
| 202 | +func IsRetriableErrorOnWrite(err error) (string, bool) { |
| 203 | + if suffix := retriableWriteErrorSuffixes.FindString(err.Error()); suffix != "" { |
| 204 | + return errorLabelsBySuffix[suffix], true |
| 205 | + } |
| 206 | + return "", false |
| 207 | +} |
| 208 | + |
| 209 | +var retriableReadErrorSuffixes = func() *regexp.Regexp { |
| 210 | + var suffixes []string |
| 211 | + for suffix := range errorLabelsBySuffix { |
| 212 | + suffixes = append(suffixes, suffix) |
| 213 | + } |
| 214 | + return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`))) |
| 215 | +}() |
| 216 | + |
| 217 | +// IsRetriableErrorOnRead returns true if and only if a retry should be attempted when the provided |
| 218 | +// error is returned from a read attempt. If the error is retriable, a non-empty string classifying |
| 219 | +// the error is also returned. |
| 220 | +func IsRetriableErrorOnRead(err error) (string, bool) { |
| 221 | + if suffix := retriableReadErrorSuffixes.FindString(err.Error()); suffix != "" { |
| 222 | + return errorLabelsBySuffix[suffix], true |
| 223 | + } |
| 224 | + if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok && etcdError.Code() == codes.Unavailable { |
| 225 | + return "Unavailable", true |
| 226 | + } |
| 227 | + return "", false |
| 228 | +} |
| 229 | + |
| 230 | +// OnError allows the caller to retry fn in case the error returned by fn is retriable |
| 231 | +// according to the provided function. backoff defines the maximum retries and the wait |
| 232 | +// interval between two retries. |
| 233 | +func OnError(ctx context.Context, backoff wait.Backoff, retriable func(error) (string, bool), fn func() error) error { |
| 234 | + var lastErr error |
| 235 | + var lastErrLabel string |
| 236 | + var retry bool |
| 237 | + var retryCounter int |
| 238 | + err := backoffWithRequestContext(ctx, backoff, func() (bool, error) { |
| 239 | + err := fn() |
| 240 | + if retry { |
| 241 | + klog.V(1).Infof("etcd retry - counter: %v, lastErrLabel: %s lastError: %v, error: %v", retryCounter, lastErrLabel, lastErr, err) |
| 242 | + metrics.UpdateEtcdRequestRetry(lastErrLabel) |
| 243 | + } |
| 244 | + if err == nil { |
| 245 | + return true, nil |
| 246 | + } |
| 247 | + |
| 248 | + lastErrLabel, retry = retriable(err) |
| 249 | + if klog.V(6).Enabled() { |
| 250 | + klog.V(6).InfoS("observed storage error", "err", err, "retriable", retry) |
| 251 | + } |
| 252 | + if retry { |
| 253 | + lastErr = err |
| 254 | + retryCounter++ |
| 255 | + return false, nil |
| 256 | + } |
| 257 | + |
| 258 | + return false, err |
| 259 | + }) |
| 260 | + if err == wait.ErrWaitTimeout && lastErr != nil { |
| 261 | + err = lastErr |
| 262 | + } |
| 263 | + return err |
| 264 | +} |
| 265 | + |
| 266 | +// backoffWithRequestContext works with a request context and a Backoff. It ensures that the retry wait never |
| 267 | +// exceeds the deadline specified by the request context. |
| 268 | +func backoffWithRequestContext(ctx context.Context, backoff wait.Backoff, condition wait.ConditionFunc) error { |
| 269 | + for backoff.Steps > 0 { |
| 270 | + if ok, err := condition(); err != nil || ok { |
| 271 | + return err |
| 272 | + } |
| 273 | + |
| 274 | + if backoff.Steps == 1 { |
| 275 | + break |
| 276 | + } |
| 277 | + |
| 278 | + waitBeforeRetry := backoff.Step() |
| 279 | + select { |
| 280 | + case <-ctx.Done(): |
| 281 | + return ctx.Err() |
| 282 | + case <-time.After(waitBeforeRetry): |
| 283 | + } |
| 284 | + } |
| 285 | + |
| 286 | + return wait.ErrWaitTimeout |
| 287 | +} |
0 commit comments