Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,29 +617,6 @@ func NewProvisionController(
controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")

if controller.createProvisionerPVLimiter != nil {
glog.V(2).Infof("Using saving PVs to API server in background")
controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter)
} else {
if controller.createProvisionedPVBackoff == nil {
// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.
if controller.createProvisionedPVInterval == 0 {
controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval
}
if controller.createProvisionedPVRetryCount == 0 {
controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount
}
controller.createProvisionedPVBackoff = &wait.Backoff{
Duration: controller.createProvisionedPVInterval,
Factor: 1, // linear backoff
Steps: controller.createProvisionedPVRetryCount,
//Cap: controller.createProvisionedPVInterval,
}
}
glog.V(2).Infof("Using blocking saving PVs to API server")
controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller)
}

informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)

// ----------------------
Expand Down Expand Up @@ -698,6 +675,30 @@ func NewProvisionController(
}
}
controller.classes = controller.classInformer.GetStore()

if controller.createProvisionerPVLimiter != nil {
glog.V(2).Infof("Using saving PVs to API server in background")
controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter, controller.claimsIndexer, controller.eventRecorder)
} else {
if controller.createProvisionedPVBackoff == nil {
// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.
if controller.createProvisionedPVInterval == 0 {
controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval
}
if controller.createProvisionedPVRetryCount == 0 {
controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount
}
controller.createProvisionedPVBackoff = &wait.Backoff{
Duration: controller.createProvisionedPVInterval,
Factor: 1, // linear backoff
Steps: controller.createProvisionedPVRetryCount,
//Cap: controller.createProvisionedPVInterval,
}
}
glog.V(2).Infof("Using blocking saving PVs to API server")
controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller)
}

return controller
}

Expand Down
2 changes: 1 addition & 1 deletion controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func TestController(t *testing.T) {
}

if test.volumeQueueStore {
ctrl.volumeStore = NewVolumeStoreQueue(client, workqueue.DefaultItemBasedRateLimiter())
ctrl.volumeStore = NewVolumeStoreQueue(client, workqueue.DefaultItemBasedRateLimiter(), ctrl.claimsIndexer, ctrl.eventRecorder)
}

if test.enqueueClaim != nil {
Expand Down
35 changes: 30 additions & 5 deletions controller/volume_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (

"k8s.io/client-go/tools/record"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)
Expand Down Expand Up @@ -56,8 +57,10 @@ type VolumeStore interface {
// PVs to API server using a workqueue running in its own goroutine(s).
// After failed save, volume is re-qeueued with exponential backoff.
type queueStore struct {
client kubernetes.Interface
queue workqueue.RateLimitingInterface
client kubernetes.Interface
queue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
claimsIndexer cache.Indexer

volumes sync.Map
}
Expand All @@ -68,11 +71,15 @@ var _ VolumeStore = &queueStore{}
func NewVolumeStoreQueue(
client kubernetes.Interface,
limiter workqueue.RateLimiter,
claimsIndexer cache.Indexer,
eventRecorder record.EventRecorder,
) VolumeStore {

return &queueStore{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(limiter, "unsavedpvs"),
client: client,
queue: workqueue.NewNamedRateLimitingQueue(limiter, "unsavedpvs"),
claimsIndexer: claimsIndexer,
eventRecorder: eventRecorder,
}
}

Expand Down Expand Up @@ -148,11 +155,29 @@ func (q *queueStore) doSaveVolume(volume *v1.PersistentVolume) error {
_, err := q.client.CoreV1().PersistentVolumes().Create(volume)
if err == nil || apierrs.IsAlreadyExists(err) {
klog.V(5).Infof("Volume %s saved", volume.Name)
q.sendSuccessEvent(volume)
return nil
}
return fmt.Errorf("error saving volume %s: %s", volume.Name, err)
}

func (q *queueStore) sendSuccessEvent(volume *v1.PersistentVolume) {
claimObjs, err := q.claimsIndexer.ByIndex(uidIndex, string(volume.Spec.ClaimRef.UID))
if err != nil {
klog.V(2).Infof("Error sending event to claim %s: %s", volume.Spec.ClaimRef.UID, err)
return
}
if len(claimObjs) != 1 {
return
}
claim, ok := claimObjs[0].(*v1.PersistentVolumeClaim)
if !ok {
return
}
msg := fmt.Sprintf("Successfully provisioned volume %s", volume.Name)
q.eventRecorder.Event(claim, v1.EventTypeNormal, "ProvisioningSucceeded", msg)
}

// backoffStore is implementation of VolumeStore that blocks and tries to save
// a volume to API server with configurable backoff. If saving fails,
// StoreVolume() deletes the storage asset in the end and returns appropriate
Expand Down