Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,59 @@ var DefaultRetry = wait.Backoff{
}

type retryClient struct {
// embed because we only want to override a few states
storage.Interface
// All methods of storage.Interface are implemented directly on *retryClient, even when they
// are purely passthroughs to the delegate. During a rebase, consider whether or not it is
// safe and appropriate for a new method added to the method set of storage.Interface to
// perform retries.
delegate storage.Interface

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the same code as in openshift/kubernetes-apiserver#72

}

func (c *retryClient) Count(key string) (int64, error) {
return c.delegate.Count(key)
}

func (c *retryClient) ReadinessCheck() error {
return c.delegate.ReadinessCheck()
}

func (c *retryClient) RequestWatchProgress(ctx context.Context) error {
return c.delegate.RequestWatchProgress(ctx)
}

func (c *retryClient) Versioner() storage.Versioner {
return c.delegate.Versioner()
}

// New returns an etcd3 implementation of storage.Interface.
func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface {
return &retryClient{Interface: delegate}
return &retryClient{delegate: delegate}
}

func (c *retryClient) GetCurrentResourceVersion(ctx context.Context) (uint64, error) {
var (
rv uint64
err error
)
return rv, OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error {
rv, err = c.delegate.GetCurrentResourceVersion(ctx)
return err
})
}

// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from database.
func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error {
return c.Interface.Create(ctx, key, obj, out, ttl)
return c.delegate.Create(ctx, key, obj, out, ttl)
})
}

// Delete removes the specified key and returns the value that existed at that spot.
// If key didn't exist, it will return NotFound storage error.
func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error {
return c.Interface.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject, opts)
return c.delegate.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject, opts)
})
}

Expand All @@ -63,7 +93,7 @@ func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOp
var ret watch.Interface
err := OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error {
var innerErr error
ret, innerErr = c.Interface.Watch(ctx, key, opts)
ret, innerErr = c.delegate.Watch(ctx, key, opts)
return innerErr
})
return ret, err
Expand All @@ -76,7 +106,7 @@ func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOp
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
return OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error {
return c.Interface.Get(ctx, key, opts, objPtr)
return c.delegate.Get(ctx, key, opts, objPtr)
})
}

Expand All @@ -88,7 +118,7 @@ func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptio
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (c *retryClient) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
return OnError(ctx, DefaultRetry, IsRetriableErrorOnRead, func() error {
return c.Interface.GetList(ctx, key, opts, listObj)
return c.delegate.GetList(ctx, key, opts, listObj)
})
}

Expand Down Expand Up @@ -129,7 +159,7 @@ func (c *retryClient) GetList(ctx context.Context, key string, opts storage.List
func (c *retryClient) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
return OnError(ctx, DefaultRetry, IsRetriableErrorOnWrite, func() error {
return c.Interface.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
return c.delegate.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
})
}

Expand Down