Skip to content
This repository has been archived by the owner on May 6, 2022. It is now read-only.

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Kibbe committed Nov 3, 2017
1 parent c8fcf51 commit e502e2a
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 40 deletions.
1 change: 1 addition & 0 deletions charts/catalog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ chart and their default values.
| `controllerManager.brokerRelistInterval` | How often the controller should relist the catalogs of ready brokers; duration format (`20m`, `1h`, etc) | `24h` |
| `useAggregator` | whether or not to set up the controller-manager to go through the main Kubernetes API server's API aggregator | `true` |
| `rbacEnable` | If true, create & use RBAC resources | `true` |
| `asyncBindingOperationsEnabled` | Whether or not alpha support for async binding operations is enabled | `false` |

Specify each parameter using the `--set key=value[,key=value]` argument to
`helm install`.
Expand Down
26 changes: 17 additions & 9 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

corev1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand All @@ -40,6 +41,7 @@ import (
servicecatalogclientset "github.com/kubernetes-incubator/service-catalog/pkg/client/clientset_generated/clientset/typed/servicecatalog/v1beta1"
informers "github.com/kubernetes-incubator/service-catalog/pkg/client/informers_generated/externalversions/servicecatalog/v1beta1"
listers "github.com/kubernetes-incubator/service-catalog/pkg/client/listers_generated/servicecatalog/v1beta1"
scfeatures "github.com/kubernetes-incubator/service-catalog/pkg/features"
pretty "github.com/kubernetes-incubator/service-catalog/pkg/pretty"
)

Expand Down Expand Up @@ -176,7 +178,10 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) {
createWorker(c.instanceQueue, "ServiceInstance", maxRetries, true, c.reconcileServiceInstanceKey, stopCh, &waitGroup)
createWorker(c.bindingQueue, "ServiceBinding", maxRetries, true, c.reconcileServiceBindingKey, stopCh, &waitGroup)
createWorker(c.instancePollingQueue, "InstancePoller", maxRetries, false, c.requeueServiceInstanceForPoll, stopCh, &waitGroup)
createWorker(c.bindingPollingQueue, "BindingPoller", maxRetries, false, c.requeueServiceBindingForPoll, stopCh, &waitGroup)

if utilfeature.DefaultFeatureGate.Enabled(scfeatures.AsyncBindingOperations) {
createWorker(c.bindingPollingQueue, "BindingPoller", maxRetries, false, c.requeueServiceBindingForPoll, stopCh, &waitGroup)
}
}

<-stopCh
Expand Down Expand Up @@ -492,17 +497,20 @@ func convertCatalog(in *osb.CatalogResponse) ([]*v1beta1.ClusterServiceClass, []
for i, svc := range in.Services {
serviceClasses[i] = &v1beta1.ClusterServiceClass{
Spec: v1beta1.ClusterServiceClassSpec{
BindingRetrievable: svc.BindingRetrievable,
Bindable: svc.Bindable,
PlanUpdatable: (svc.PlanUpdatable != nil && *svc.PlanUpdatable),
ExternalID: svc.ID,
ExternalName: svc.Name,
Tags: svc.Tags,
Description: svc.Description,
Requires: svc.Requires,
Bindable: svc.Bindable,
PlanUpdatable: (svc.PlanUpdatable != nil && *svc.PlanUpdatable),
ExternalID: svc.ID,
ExternalName: svc.Name,
Tags: svc.Tags,
Description: svc.Description,
Requires: svc.Requires,
},
}

if utilfeature.DefaultFeatureGate.Enabled(scfeatures.AsyncBindingOperations) {
serviceClasses[i].Spec.BindingRetrievable = svc.BindingRetrievable
}

if svc.Metadata != nil {
metadata, err := json.Marshal(svc.Metadata)
if err != nil {
Expand Down
75 changes: 47 additions & 28 deletions pkg/controller/controller_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ func (c *controller) bindingAdd(obj interface{}) {
}

func (c *controller) bindingUpdate(oldObj, newObj interface{}) {
c.bindingAdd(newObj)
// Bindings with ongoing asynchronous operations will be manually added
// to the polling queue by the reconciler. They should be ignored here in
// order to enforce polling rate-limiting.
binding := newObj.(*v1beta1.ServiceBinding)
if !binding.Status.AsyncOpInProgress {
c.bindingAdd(newObj)
}
}

func (c *controller) bindingDelete(obj interface{}) {
Expand Down Expand Up @@ -409,6 +415,11 @@ func (c *controller) reconcileServiceBinding(binding *v1beta1.ServiceBinding) er
BindResource: &osb.BindResource{AppGUID: &appGUID},
}

// Asynchronous binding operations is currently ALPHA and not
// enabled by default. To use this feature, you must enable the
// AsyncBindingOperations feature gate. This may be easily set
// by setting `asyncBindingOperationsEnabled=true` when
// deploying the Service Catalog via the Helm charts.
if serviceClass.Spec.BindingRetrievable &&
utilfeature.DefaultFeatureGate.Enabled(scfeatures.AsyncBindingOperations) {

Expand Down Expand Up @@ -660,6 +671,11 @@ func (c *controller) reconcileServiceBinding(binding *v1beta1.ServiceBinding) er
PlanID: servicePlan.Spec.ExternalID,
}

// Asynchronous binding operations is currently ALPHA and not
// enabled by default. To use this feature, you must enable the
// AsyncBindingOperations feature gate. This may be easily set
// by setting `asyncBindingOperationsEnabled=true` when
// deploying the Service Catalog via the Helm charts.
if serviceClass.Spec.BindingRetrievable &&
utilfeature.DefaultFeatureGate.Enabled(scfeatures.AsyncBindingOperations) {

Expand Down Expand Up @@ -1162,7 +1178,6 @@ func (c *controller) pollServiceBinding(binding *v1beta1.ServiceBinding) error {
// deleting or mitigating an orphan; this is more readable than
// checking the timestamps in various places.
mitigatingOrphan := binding.Status.OrphanMitigationInProgress
creating := binding.Status.CurrentOperation == v1beta1.ServiceBindingOperationBind && !mitigatingOrphan
deleting := false
if binding.Status.CurrentOperation == v1beta1.ServiceBindingOperationUnbind || mitigatingOrphan {
deleting = true
Expand Down Expand Up @@ -1191,7 +1206,7 @@ func (c *controller) pollServiceBinding(binding *v1beta1.ServiceBinding) error {
)
}

if !creating {
if deleting {
clearServiceBindingCurrentOperation(binding)

if _, err := c.updateServiceBindingStatus(binding); err != nil {
Expand Down Expand Up @@ -1306,8 +1321,8 @@ func (c *controller) pollServiceBinding(binding *v1beta1.ServiceBinding) error {
glog.V(4).Info(pcb.Message(s))
c.recorder.Event(binding, corev1.EventTypeWarning, errorPollingLastOperationReason, s)

if err := c.checkPollingServiceBindingForReconciliationRetryTimeout(binding); err != nil {
return nil
if c.isServiceBindingReconciliationRetryDurationExceeded(binding) {
return c.reconciliationRetryDurationExceededFinishPollingServiceBinding(binding)
}

return c.continuePollingServiceBinding(binding)
Expand Down Expand Up @@ -1342,8 +1357,8 @@ func (c *controller) pollServiceBinding(binding *v1beta1.ServiceBinding) error {
)
}

if err := c.checkPollingServiceBindingForReconciliationRetryTimeout(binding); err != nil {
return nil
if c.isServiceBindingReconciliationRetryDurationExceeded(binding) {
return c.reconciliationRetryDurationExceededFinishPollingServiceBinding(binding)
}

if _, err := c.updateServiceBindingStatus(binding); err != nil {
Expand Down Expand Up @@ -1421,8 +1436,8 @@ func (c *controller) pollServiceBinding(binding *v1beta1.ServiceBinding) error {
s,
)

if err := c.checkPollingServiceBindingForReconciliationRetryTimeout(binding); err != nil {
return nil
if c.isServiceBindingReconciliationRetryDurationExceeded(binding) {
return c.reconciliationRetryDurationExceededFinishPollingServiceBinding(binding)
}

if _, err := c.updateServiceBindingStatus(binding); err != nil {
Expand All @@ -1445,8 +1460,8 @@ func (c *controller) pollServiceBinding(binding *v1beta1.ServiceBinding) error {
s,
)

if err := c.checkPollingServiceBindingForReconciliationRetryTimeout(binding); err != nil {
return nil
if c.isServiceBindingReconciliationRetryDurationExceeded(binding) {
return c.reconciliationRetryDurationExceededFinishPollingServiceBinding(binding)
}

if _, err := c.updateServiceBindingStatus(binding); err != nil {
Expand Down Expand Up @@ -1530,29 +1545,37 @@ func (c *controller) pollServiceBinding(binding *v1beta1.ServiceBinding) error {
default:
glog.Warning(pcb.Messagef("Got invalid state in LastOperationResponse: %q", response.State))

if err := c.checkPollingServiceBindingForReconciliationRetryTimeout(binding); err != nil {
return nil
if c.isServiceBindingReconciliationRetryDurationExceeded(binding) {
return c.reconciliationRetryDurationExceededFinishPollingServiceBinding(binding)
}

return c.continuePollingServiceBinding(binding)
}
}

// checkPollingServiceBindingForReconciliationTimeout checks to see whether the
// polling binding that has exceeded its reconciliation retry duration. If so, it will
// mark the operation as failed and return an error.
// reconciliationTimeExpired tests if the current Operation State time has
// elapsed the reconciliationRetryDuration time period
func (c *controller) isServiceBindingReconciliationRetryDurationExceeded(binding *v1beta1.ServiceBinding) bool {
if time.Now().After(binding.Status.OperationStartTime.Time.Add(c.reconciliationRetryDuration)) {
return true
}
return false
}

// reconciliationRetryDurationExceededFinishPollingServiceBinding marks the
// binding as failed due to the reconciliation retry duration having been
// exceeded.
//
// The binding resource passed will be directly modified, so make sure it is
// not directly from the cache.
func (c *controller) checkPollingServiceBindingForReconciliationRetryTimeout(binding *v1beta1.ServiceBinding) error {
if time.Now().Before(binding.Status.OperationStartTime.Time.Add(c.reconciliationRetryDuration)) {
return nil
}

func (c *controller) reconciliationRetryDurationExceededFinishPollingServiceBinding(binding *v1beta1.ServiceBinding) error {
pcb := pretty.NewContextBuilder(pretty.ServiceBinding, binding.Namespace, binding.Name)

mitigatingOrphan := binding.Status.OrphanMitigationInProgress
creating := binding.Status.CurrentOperation == v1beta1.ServiceBindingOperationBind && !mitigatingOrphan
deleting := false
if binding.Status.CurrentOperation == v1beta1.ServiceBindingOperationUnbind || mitigatingOrphan {
deleting = true
}

s := "Stopping reconciliation retries because too much time has elapsed"
glog.Infof(pcb.Message(s))
Expand All @@ -1576,7 +1599,7 @@ func (c *controller) checkPollingServiceBindingForReconciliationRetryTimeout(bin
)
}

if !creating {
if deleting {
clearServiceBindingCurrentOperation(binding)

if _, err := c.updateServiceBindingStatus(binding); err != nil {
Expand All @@ -1588,9 +1611,5 @@ func (c *controller) checkPollingServiceBindingForReconciliationRetryTimeout(bin
}
}

if err := c.finishPollingServiceBinding(binding); err != nil {
return err
}

return fmt.Errorf(s)
return c.finishPollingServiceBinding(binding)
}
6 changes: 3 additions & 3 deletions pkg/openapi/openapi_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
},
"relistDuration": {
SchemaProps: spec.SchemaProps{
Description: "RelistDuration is the frequency by which a controller will relist the broker when the RelistBehavior is set to ServiceBrokerRelistBehaviorDuration.",
Description: "RelistDuration is the frequency by which a controller will relist the broker when the RelistBehavior is set to ServiceBrokerRelistBehaviorDuration. Users are cautioned against configuring low values for the RelistDuration, as this can easily overload the controller manager in an environment with many brokers. The actual interval is intrinsically governed by the configured resync interval of the controller, which acts as a minimum bound. For example, with a resync interval of 5m and a RelistDuration of 2m, relists will occur at the resync interval of 5m.",
Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Duration"),
},
},
Expand Down Expand Up @@ -1006,14 +1006,14 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
},
"asyncOpInProgress": {
SchemaProps: spec.SchemaProps{
Description: "AsyncOpInProgress is set to true if there is an ongoing async operation against this ServiceBinding in progress.",
Description: "Currently, this field is ALPHA: it may change or disappear at any time and its data will not be migrated.\n\nAsyncOpInProgress is set to true if there is an ongoing async operation against this ServiceBinding in progress.",
Type: []string{"boolean"},
Format: "",
},
},
"lastOperation": {
SchemaProps: spec.SchemaProps{
Description: "LastOperation is the string that the broker may have returned when an async operation started, it should be sent back to the broker on poll requests as a query param.",
Description: "Currently, this field is ALPHA: it may change or disappear at any time and its data will not be migrated.\n\nLastOperation is the string that the broker may have returned when an async operation started, it should be sent back to the broker on poll requests as a query param.",
Type: []string{"string"},
Format: "",
},
Expand Down

0 comments on commit e502e2a

Please sign in to comment.