Skip to content

Commit

Permalink
add delays when handling retries so we don't tight loop
Browse files Browse the repository at this point in the history
  • Loading branch information
bparees committed Apr 6, 2015
1 parent e218a54 commit e03b50d
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 8 deletions.
17 changes: 11 additions & 6 deletions pkg/build/controller/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
imageapi "github.com/openshift/origin/pkg/image/api"
)

// logAndRetry retries forever - BuildPodController currently has no fatal errors
func logAndRetry(obj interface{}, err error, _ int) bool {
// limitedLogAndRetry stops retrying after 60 attempts. Given the throttler configuration,
// this means this event has been retried over a period of at least 50 seconds.
func limitedLogAndRetry(obj interface{}, err error, count int) bool {
kutil.HandleError(err)
return true
return count < 60
}

// BuildControllerFactory constructs BuildController objects
Expand Down Expand Up @@ -62,7 +63,7 @@ func (factory *BuildControllerFactory) Create() controller.RunnableController {

return &controller.RetryController{
Queue: queue,
RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, logAndRetry),
RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, limitedLogAndRetry, kutil.NewTokenBucketRateLimiter(1, 10)),
Handle: func(obj interface{}) error {
build := obj.(*buildapi.Build)
return buildController.HandleBuild(build)
Expand Down Expand Up @@ -106,7 +107,7 @@ func (factory *BuildPodControllerFactory) Create() controller.RunnableController

return &controller.RetryController{
Queue: queue,
RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, logAndRetry),
RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, limitedLogAndRetry, kutil.NewTokenBucketRateLimiter(1, 10)),
Handle: func(obj interface{}) error {
pod := obj.(*kapi.Pod)
return buildPodController.HandlePod(pod)
Expand Down Expand Up @@ -145,13 +146,17 @@ func (factory *ImageChangeControllerFactory) Create() controller.RunnableControl
RetryManager: controller.NewQueueRetryManager(
queue,
cache.MetaNamespaceKeyFunc,
func(obj interface{}, err error, _ int) bool {
func(obj interface{}, err error, count int) bool {
kutil.HandleError(err)
if _, isFatal := err.(buildcontroller.ImageChangeControllerFatalError); isFatal {
return false
}
if count > 60 {
return false
}
return true
},
kutil.NewTokenBucketRateLimiter(1, 10),
),
Handle: func(obj interface{}) error {
imageRepo := obj.(*imageapi.ImageStream)
Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ type QueueRetryManager struct {

// retries maps resources to their current retry count.
retries map[string]int

// limits how fast retries can be enqueued to ensure you can't tight
// loop on retries.
limiter kutil.RateLimiter
}

// ReQueue is a queue that allows an object to be requeued
Expand All @@ -94,12 +98,13 @@ type ReQueue interface {
}

// NewQueueRetryManager safely creates a new QueueRetryManager.
func NewQueueRetryManager(queue ReQueue, keyFn kcache.KeyFunc, retryFn RetryFunc) *QueueRetryManager {
func NewQueueRetryManager(queue ReQueue, keyFn kcache.KeyFunc, retryFn RetryFunc, limiter kutil.RateLimiter) *QueueRetryManager {
return &QueueRetryManager{
queue: queue,
keyFunc: keyFn,
retryFunc: retryFn,
retries: make(map[string]int),
limiter: limiter,
}
}

Expand All @@ -115,6 +120,7 @@ func (r *QueueRetryManager) Retry(resource interface{}, err error) {
tries := r.retries[id]

if r.retryFunc(resource, err, tries) {
r.limiter.Accept()
// It's important to use AddIfNotPresent to prevent overwriting newer
// state in the queue which may have arrived asynchronously.
r.queue.AddIfNotPresent(resource)
Expand Down
46 changes: 45 additions & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

func TestRetryController_handleOneRetryableError(t *testing.T) {
Expand Down Expand Up @@ -86,6 +87,7 @@ func TestQueueRetryManager_retries(t *testing.T) {
return true
},
retries: make(map[string]int),
limiter: kutil.NewTokenBucketRateLimiter(1000, 1000),
}

objects := []testObj{
Expand Down Expand Up @@ -129,7 +131,7 @@ func TestRetryController_realFifoEventOrdering(t *testing.T) {

controller := &RetryController{
Queue: fifo,
RetryManager: NewQueueRetryManager(fifo, keyFunc, func(_ interface{}, _ error, _ int) bool { return true }),
RetryManager: NewQueueRetryManager(fifo, keyFunc, func(_ interface{}, _ error, _ int) bool { return true }, kutil.NewTokenBucketRateLimiter(1000, 10)),
Handle: func(obj interface{}) error {
if e, a := 1, obj.(testObj).value; e != a {
t.Fatalf("expected to handle test value %d, got %d", e, a)
Expand Down Expand Up @@ -157,6 +159,48 @@ func TestRetryController_realFifoEventOrdering(t *testing.T) {
}
}

// This test ensures that when events are retried, the
// requeue rate does not exceed the configured rate limit,
// including burst behavior.
func TestRetryController_ratelimit(t *testing.T) {
keyFunc := func(obj interface{}) (string, error) {
return "key", nil
}
fifo := kcache.NewFIFO(keyFunc)
limiter := &mockLimiter{}
retryManager := NewQueueRetryManager(fifo,
keyFunc,
func(_ interface{}, _ error, c int) bool {
if c < 15 {
return true
}
return false
},
limiter,
)
for i := 0; i < 10; i++ {
retryManager.Retry("key", nil)
}

if limiter.count != 10 {
t.Fatalf("Retries did not invoke rate limiter, expected %d got %d", 10, limiter.count)
}
}

type mockLimiter struct {
count int
}

func (l *mockLimiter) CanAccept() bool {
return true
}

func (l *mockLimiter) Accept() {
l.count++
}

func (l *mockLimiter) Stop() {}

type testObj struct {
id string
value int
Expand Down
1 change: 1 addition & 0 deletions pkg/deploy/controller/configchange/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (factory *DeploymentConfigChangeControllerFactory) Create() controller.Runn
}
return true
},
kutil.NewTokenBucketRateLimiter(1, 10),
),
Handle: func(obj interface{}) error {
config := obj.(*deployapi.DeploymentConfig)
Expand Down
2 changes: 2 additions & 0 deletions pkg/deploy/controller/deployerpod/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"

controller "github.com/openshift/origin/pkg/controller"
Expand Down Expand Up @@ -73,6 +74,7 @@ func (factory *DeployerPodControllerFactory) Create() controller.RunnableControl
podQueue,
cache.MetaNamespaceKeyFunc,
func(obj interface{}, err error, count int) bool { return count < 1 },
kutil.NewTokenBucketRateLimiter(1, 10),
),
Handle: func(obj interface{}) error {
pod := obj.(*kapi.Pod)
Expand Down
1 change: 1 addition & 0 deletions pkg/deploy/controller/deployment/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (factory *DeploymentControllerFactory) Create() controller.RunnableControll
}
return true
},
kutil.NewTokenBucketRateLimiter(1, 10),
),
Handle: func(obj interface{}) error {
deployment := obj.(*kapi.ReplicationController)
Expand Down
1 change: 1 addition & 0 deletions pkg/deploy/controller/deploymentconfig/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (factory *DeploymentConfigControllerFactory) Create() controller.RunnableCo
}
return true
},
kutil.NewTokenBucketRateLimiter(1, 10),
),
Handle: func(obj interface{}) error {
config := obj.(*deployapi.DeploymentConfig)
Expand Down
1 change: 1 addition & 0 deletions pkg/deploy/controller/imagechange/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (factory *ImageChangeControllerFactory) Create() controller.RunnableControl
}
return true
},
kutil.NewTokenBucketRateLimiter(1, 10),
),
Handle: func(obj interface{}) error {
repo := obj.(*imageapi.ImageStream)
Expand Down
2 changes: 2 additions & 0 deletions pkg/image/controller/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"

"github.com/openshift/origin/pkg/client"
Expand Down Expand Up @@ -50,6 +51,7 @@ func (f *ImportControllerFactory) Create() controller.RunnableController {
util.HandleError(err)
return count < 5
},
kutil.NewTokenBucketRateLimiter(1, 10),
),
Handle: func(obj interface{}) error {
r := obj.(*api.ImageStream)
Expand Down
1 change: 1 addition & 0 deletions pkg/project/controller/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (factory *NamespaceControllerFactory) Create() controller.RunnableControlle
}
return true
},
kutil.NewTokenBucketRateLimiter(1, 10),
),
Handle: func(obj interface{}) error {
namespace := obj.(*kapi.Namespace)
Expand Down

0 comments on commit e03b50d

Please sign in to comment.