Skip to content

Commit b24f93e

Browse files
committed
UPSTREAM: <carry>: delay queuing deletion for PV to allow nodes some time to unmount
1 parent c501845 commit b24f93e

File tree

2 files changed

+21
-4
lines changed

2 files changed

+21
-4
lines changed

pkg/controller/volume/persistentvolume/pv_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ type PersistentVolumeController struct {
202202
// however overall speed of multi-worker controller would be lower than if
203203
// it runs single thread only.
204204
claimQueue *workqueue.Type
205-
volumeQueue *workqueue.Type
205+
volumeQueue workqueue.RateLimitingInterface
206206

207207
// Map of scheduled/running operations.
208208
runningOperations goroutinemap.GoRoutineMap

pkg/controller/volume/persistentvolume/pv_controller_base.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
9797
createProvisionedPVRetryCount: createProvisionedPVRetryCount,
9898
createProvisionedPVInterval: createProvisionedPVInterval,
9999
claimQueue: workqueue.NewNamed("claims"),
100-
volumeQueue: workqueue.NewNamed("volumes"),
100+
volumeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volumes"),
101101
resyncPeriod: p.SyncPeriod,
102102
operationTimestamps: metrics.NewOperationStartTimeCache(),
103103
}
@@ -111,7 +111,7 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
111111
cache.ResourceEventHandlerFuncs{
112112
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
113113
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
114-
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
114+
DeleteFunc: func(obj interface{}) { controller.enqueueWorkAfter(controller.volumeQueue, obj, 21*time.Second) },
115115
},
116116
)
117117
controller.volumeLister = p.VolumeInformer.Lister()
@@ -194,6 +194,20 @@ func (ctrl *PersistentVolumeController) enqueueWork(queue workqueue.Interface, o
194194
queue.Add(objName)
195195
}
196196

197+
func (ctrl *PersistentVolumeController) enqueueWorkAfter(queue workqueue.DelayingInterface, obj interface{}, delay time.Duration) {
198+
// Beware of "xxx deleted" events
199+
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
200+
obj = unknown.Obj
201+
}
202+
objName, err := controller.KeyFunc(obj)
203+
if err != nil {
204+
klog.Errorf("failed to get key from object: %v", err)
205+
return
206+
}
207+
klog.V(5).Infof("enqueued %q for sync", objName)
208+
queue.AddAfter(objName, delay)
209+
}
210+
197211
func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume interface{}) (bool, error) {
198212
return storeObjectUpdate(ctrl.volumes.store, volume, "volume")
199213
}
@@ -295,8 +309,11 @@ func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeCl
295309
// sync the volume when its claim is deleted. Explicitly sync'ing the
296310
// volume here in response to claim deletion prevents the volume from
297311
// waiting until the next sync period for its Release.
312+
// delay queuing the volume to allow some time for nodes to detach the volume from the node. The time chosen here
313+
// is to hopefully be short enough that e2e tests still pass and long enough that most PVs stop hitting the failure
314+
// errors.
298315
klog.V(5).Infof("deleteClaim[%q]: scheduling sync of volume %s", claimKey, volumeName)
299-
ctrl.volumeQueue.Add(volumeName)
316+
ctrl.volumeQueue.AddAfter(volumeName, 21*time.Second)
300317
}
301318

302319
// Run starts all of this controller's control loops

0 commit comments

Comments
 (0)