Skip to content

Commit

Permalink
add delays when handling retries so we don't tight loop
Browse files Browse the repository at this point in the history
  • Loading branch information
bparees committed Apr 2, 2015
1 parent 269b892 commit c6f339e
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 6 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 15 additions & 4 deletions pkg/build/controller/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,14 @@ import (
)

// logAndRetry retries forever - BuildPodController currently has no fatal errors
func logAndRetry(obj interface{}, err error, _ int) bool {
func logAndRetry(obj interface{}, err error, count int) bool {
kutil.HandleError(err)
if count > 60 {
// stop retrying after 60 attempts. Given the throttler
// configuration, this means we've retried this event over
// a period of at least 50 seconds.
return false
}
return true
}

Expand Down Expand Up @@ -62,7 +68,7 @@ func (factory *BuildControllerFactory) Create() controller.RunnableController {

return &controller.RetryController{
Queue: queue,
RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, logAndRetry),
RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, logAndRetry, 1, 10),
Handle: func(obj interface{}) error {
build := obj.(*buildapi.Build)
return buildController.HandleBuild(build)
Expand Down Expand Up @@ -106,7 +112,7 @@ func (factory *BuildPodControllerFactory) Create() controller.RunnableController

return &controller.RetryController{
Queue: queue,
RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, logAndRetry),
RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, logAndRetry, 1, 10),
Handle: func(obj interface{}) error {
pod := obj.(*kapi.Pod)
return buildPodController.HandlePod(pod)
Expand Down Expand Up @@ -145,13 +151,18 @@ func (factory *ImageChangeControllerFactory) Create() controller.RunnableControl
RetryManager: controller.NewQueueRetryManager(
queue,
cache.MetaNamespaceKeyFunc,
func(obj interface{}, err error, _ int) bool {
func(obj interface{}, err error, count int) bool {
kutil.HandleError(err)
if _, isFatal := err.(buildcontroller.ImageChangeControllerFatalError); isFatal {
return false
}
if count > 60 {
return false
}
return true
},
1,
10,
),
Handle: func(obj interface{}) error {
imageRepo := obj.(*imageapi.ImageRepository)
Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ type QueueRetryManager struct {

// retries maps resources to their current retry count.
retries map[string]int

// limits how fast retries can be enqueued to ensure you can't tight
// loop on retries.
limiter kutil.RateLimiter
}

// ReQueue is a queue that allows an object to be requeued
Expand All @@ -94,12 +98,13 @@ type ReQueue interface {
}

// NewQueueRetryManager safely creates a new QueueRetryManager.
func NewQueueRetryManager(queue ReQueue, keyFn kcache.KeyFunc, retryFn RetryFunc) *QueueRetryManager {
func NewQueueRetryManager(queue ReQueue, keyFn kcache.KeyFunc, retryFn RetryFunc, rate float32, burst int) *QueueRetryManager {
return &QueueRetryManager{
queue: queue,
keyFunc: keyFn,
retryFunc: retryFn,
retries: make(map[string]int),
limiter: kutil.NewTokenBucketRateLimiter(rate, burst),
}
}

Expand All @@ -115,6 +120,7 @@ func (r *QueueRetryManager) Retry(resource interface{}, err error) {
tries := r.retries[id]

if r.retryFunc(resource, err, tries) {
r.limiter.Accept()
// It's important to use AddIfNotPresent to prevent overwriting newer
// state in the queue which may have arrived asynchronously.
r.queue.AddIfNotPresent(resource)
Expand Down
38 changes: 37 additions & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"sync"
"testing"
"time"

kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

func TestRetryController_handleOneRetryableError(t *testing.T) {
Expand Down Expand Up @@ -86,6 +88,7 @@ func TestQueueRetryManager_retries(t *testing.T) {
return true
},
retries: make(map[string]int),
limiter: kutil.NewTokenBucketRateLimiter(1000, 1000),
}

objects := []testObj{
Expand Down Expand Up @@ -129,7 +132,7 @@ func TestRetryController_realFifoEventOrdering(t *testing.T) {

controller := &RetryController{
Queue: fifo,
RetryManager: NewQueueRetryManager(fifo, keyFunc, func(_ interface{}, _ error, _ int) bool { return true }),
RetryManager: NewQueueRetryManager(fifo, keyFunc, func(_ interface{}, _ error, _ int) bool { return true }, 1000, 10),
Handle: func(obj interface{}) error {
if e, a := 1, obj.(testObj).value; e != a {
t.Fatalf("expected to handle test value %d, got %d", e, a)
Expand Down Expand Up @@ -157,6 +160,39 @@ func TestRetryController_realFifoEventOrdering(t *testing.T) {
}
}

// This test ensures that when events are retried, the
// requeue rate does not exceed the configured rate limit,
// including burst behavior.
func TestRetryController_ratelimit(t *testing.T) {
keyFunc := func(obj interface{}) (string, error) {
return "key", nil
}
fifo := kcache.NewFIFO(keyFunc)

retryManager := NewQueueRetryManager(fifo,
keyFunc,
func(_ interface{}, _ error, c int) bool {
if c < 15 {
return true
}
return false
},
10,
5)
// burst bucket is 5, so we'll run the first 5 retries
// immediately, then the remaining 10 retries will take
// 0.1s each, so the total time should not be less than
// 1s.
expectedFinish := time.Now().Add(time.Second * 1)
for i := 0; i < 15; i++ {
retryManager.Retry("key", nil)
}
now := time.Now()
if now.Before(expectedFinish) {
t.Fatalf("retried too fast, expected %s but finished by %s", expectedFinish, now)
}
}

type testObj struct {
id string
value int
Expand Down
2 changes: 2 additions & 0 deletions pkg/deploy/controller/configchange/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (factory *DeploymentConfigChangeControllerFactory) Create() controller.Runn
}
return true
},
1,
10,
),
Handle: func(obj interface{}) error {
config := obj.(*deployapi.DeploymentConfig)
Expand Down
2 changes: 2 additions & 0 deletions pkg/deploy/controller/deployerpod/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func (factory *DeployerPodControllerFactory) Create() controller.RunnableControl
podQueue,
cache.MetaNamespaceKeyFunc,
func(obj interface{}, err error, count int) bool { return count < 1 },
1,
10,
),
Handle: func(obj interface{}) error {
pod := obj.(*kapi.Pod)
Expand Down
2 changes: 2 additions & 0 deletions pkg/deploy/controller/deployment/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (factory *DeploymentControllerFactory) Create() controller.RunnableControll
}
return true
},
1,
10,
),
Handle: func(obj interface{}) error {
deployment := obj.(*kapi.ReplicationController)
Expand Down
2 changes: 2 additions & 0 deletions pkg/deploy/controller/deploymentconfig/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (factory *DeploymentConfigControllerFactory) Create() controller.RunnableCo
}
return true
},
1,
10,
),
Handle: func(obj interface{}) error {
config := obj.(*deployapi.DeploymentConfig)
Expand Down
2 changes: 2 additions & 0 deletions pkg/deploy/controller/imagechange/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func (factory *ImageChangeControllerFactory) Create() controller.RunnableControl
}
return true
},
1,
10,
),
Handle: func(obj interface{}) error {
repo := obj.(*imageapi.ImageRepository)
Expand Down
2 changes: 2 additions & 0 deletions pkg/image/controller/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (f *ImportControllerFactory) Create() controller.RunnableController {
util.HandleError(err)
return count < 5
},
1,
10,
),
Handle: func(obj interface{}) error {
r := obj.(*api.ImageRepository)
Expand Down
2 changes: 2 additions & 0 deletions pkg/project/controller/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (factory *NamespaceControllerFactory) Create() controller.RunnableControlle
}
return true
},
1,
10,
),
Handle: func(obj interface{}) error {
namespace := obj.(*kapi.Namespace)
Expand Down

0 comments on commit c6f339e

Please sign in to comment.