diff --git a/runner/sidecar/sidecar.go b/runner/sidecar/sidecar.go index 67278312..d6cb4f6c 100644 --- a/runner/sidecar/sidecar.go +++ b/runner/sidecar/sidecar.go @@ -14,6 +14,7 @@ import ( dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" tls2 "github.com/argoproj-labs/argo-dataflow/runner/sidecar/tls" sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util" + "github.com/argoproj-labs/argo-dataflow/shared/util/retry" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -234,10 +235,10 @@ func logMetrics(ctx context.Context) error { } func enrichSpec(ctx context.Context) error { - if err := enrichSources(ctx); err != nil { + if err := retry.WithDefaultRetry(func() error { return enrichSources(ctx) }); err != nil { return err } - return enrichSinks(ctx) + return retry.WithDefaultRetry(func() error { return enrichSinks(ctx) }) } func enrichSources(ctx context.Context) error { diff --git a/shared/util/retry/retry.go b/shared/util/retry/retry.go new file mode 100644 index 00000000..fe2429ab --- /dev/null +++ b/shared/util/retry/retry.go @@ -0,0 +1,19 @@ +package retry + +import ( + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/wait" + k8sRetry "k8s.io/client-go/util/retry" +) + +func retryableErrors(err error) bool { + return errors.IsTimeout(err) || errors.IsServerTimeout(err) || errors.IsTooManyRequests(err) +} + +func WithDefaultRetry(fn func() error) error { + return WithRetry(k8sRetry.DefaultBackoff, fn) +} + +func WithRetry(back wait.Backoff, fn func() error) error { + return k8sRetry.OnError(back, retryableErrors, fn) +}