Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/cmd/server/origin/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
osclient "github.com/openshift/origin/pkg/client"
cmdutil "github.com/openshift/origin/pkg/cmd/util"
"github.com/openshift/origin/pkg/cmd/util/clientcmd"
deployconfigcontroller "github.com/openshift/origin/pkg/deploy/controller/deploymentconfig"
deploycontrollerfactory "github.com/openshift/origin/pkg/deploy/controller/factory"
deployconfiggenerator "github.com/openshift/origin/pkg/deploy/generator"
deployregistry "github.com/openshift/origin/pkg/deploy/registry/deploy"
Expand Down Expand Up @@ -710,7 +711,7 @@ func (c *MasterConfig) RunDeploymentController() {

func (c *MasterConfig) RunDeploymentConfigController() {
osclient, kclient := c.DeploymentConfigControllerClients()
factory := deploycontrollerfactory.DeploymentConfigControllerFactory{
factory := deployconfigcontroller.DeploymentConfigControllerFactory{
Client: osclient,
KubeClient: kclient,
Codec: latest.Codec,
Expand Down
126 changes: 126 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package controller

import (
kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

// RunnableController is a controller which implements a Run loop.
type RunnableController interface {
// Run starts the asynchronous controller loop.
Run()
}

// RetryController is a RunnableController which delegates resource
// handling to a function and knows how to safely manage retries of a resource
// which failed to be successfully handled.
type RetryController struct {
// Queue is where work is retrieved for Handle.
Queue Queue

// Handle is expected to process the next resource from the queue.
Handle func(interface{}) error

// ShouldRetry returns true if the resource and error returned from
// HandleNext should trigger a retry via the RetryManager.
ShouldRetry func(interface{}, error) bool

// RetryManager is fed the handled resource if Handle returns a Retryable
// error. If Handle returns no error, the RetryManager is asked to forget
// the resource.
RetryManager RetryManager
}

// Queue is a narrow abstraction of a cache.FIFO.
type Queue interface {
Pop() interface{}
AddIfNotPresent(interface{}) error
}

// Run begins processing resources from Queue asynchronously.
func (c *RetryController) Run() {
go kutil.Forever(func() { c.handleOne(c.Queue.Pop()) }, 0)
}

// handleOne processes resource with Handle. If Handle returns a retryable
// error, the handled resource is passed to the RetryManager. If no error is
// returned from Handle, the RetryManager is asked to forget the processed
// resource.
func (c *RetryController) handleOne(resource interface{}) {
err := c.Handle(resource)
if err != nil {
if c.ShouldRetry(resource, err) {
c.RetryManager.Retry(resource)
return
}
}
c.RetryManager.Forget(resource)
}

// RetryManager knows how to retry processing of a resource, and how to forget
// a resource it may be tracking the state of.
type RetryManager interface {
// Retry will cause resource processing to be retried (for example, by
// requeueing resource)
Retry(resource interface{})

// Forget will cause the manager to erase all prior knowledge of resource
// and reclaim internal resources associated with state tracking of
// resource.
Forget(resource interface{})
}

// QueueRetryManager retries a resource by re-queueing it into a Queue up to
// MaxRetries number of times.
type QueueRetryManager struct {
// queue is where resources are re-queued.
queue Queue

// keyFunc is used to index resources.
keyFunc kcache.KeyFunc

// maxRetries is the total number of attempts to requeue an individual
// resource before giving up. A value of -1 is interpreted as retry forever.
maxRetries int

// retries maps resources to their current retry count.
retries map[string]int
}

// NewQueueRetryManager safely creates a new QueueRetryManager.
func NewQueueRetryManager(queue Queue, keyFunc kcache.KeyFunc, maxRetries int) *QueueRetryManager {
return &QueueRetryManager{
queue: queue,
keyFunc: keyFunc,
maxRetries: maxRetries,
retries: make(map[string]int),
}
}

// Retry will enqueue resource until maxRetries for that resource has been
// exceeded, at which point resource will be forgotten and no longer retried.
//
// A maxRetries value of -1 is interpreted as retry forever.
func (r *QueueRetryManager) Retry(resource interface{}) {
id, _ := r.keyFunc(resource)

if _, exists := r.retries[id]; !exists {
r.retries[id] = 0
}
tries := r.retries[id]

if tries < r.maxRetries || r.maxRetries == -1 {
// It's important to use AddIfNotPresent to prevent overwriting newer
// state in the queue which may have arrived asynchronously.
r.queue.AddIfNotPresent(resource)
r.retries[id] = tries + 1
} else {
r.Forget(resource)
}
}

// Forget resets the retry count for resource.
func (r *QueueRetryManager) Forget(resource interface{}) {
id, _ := r.keyFunc(resource)
delete(r.retries, id)
}
Loading