Skip to content

Commit

Permalink
Support structured and contextual logging
Browse files Browse the repository at this point in the history
  • Loading branch information
bells17 committed Sep 17, 2023
1 parent cd540b4 commit 7abe4a2
Show file tree
Hide file tree
Showing 8 changed files with 460 additions and 401 deletions.
145 changes: 80 additions & 65 deletions controller/controller.go

Large diffs are not rendered by default.

94 changes: 56 additions & 38 deletions controller/controller_test.go

Large diffs are not rendered by default.

61 changes: 32 additions & 29 deletions controller/volume_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type VolumeStore interface {
// is being saved in background.
// In error is returned, no PV was saved and corresponding PVC needs
// to be re-queued (so whole provisioning needs to be done again).
StoreVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error
StoreVolume(ctx context.Context, claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error

// Runs any background goroutines for implementation of the interface.
Run(ctx context.Context, threadiness int)
Expand Down Expand Up @@ -83,33 +83,34 @@ func NewVolumeStoreQueue(
}
}

func (q *queueStore) StoreVolume(_ *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error {
if err := q.doSaveVolume(volume); err != nil {
func (q *queueStore) StoreVolume(ctx context.Context, _ *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error {
if err := q.doSaveVolume(ctx, volume); err != nil {
q.volumes.Store(volume.Name, volume)
q.queue.Add(volume.Name)
klog.Errorf("Failed to save volume %s: %s", volume.Name, err)
klog.FromContext(ctx).Error(err, "Failed to save volume", "volume", volume.Name)
}
// Consume any error, this Store will retry in background.
return nil
}

func (q *queueStore) Run(ctx context.Context, threadiness int) {
klog.Infof("Starting save volume queue")
logger := klog.FromContext(ctx)
logger.Info("Starting save volume queue")
defer q.queue.ShutDown()

for i := 0; i < threadiness; i++ {
go wait.Until(q.saveVolumeWorker, time.Second, ctx.Done())
go wait.UntilWithContext(ctx, q.saveVolumeWorker, time.Second)
}
<-ctx.Done()
klog.Infof("Stopped save volume queue")
logger.Info("Stopped save volume queue")
}

func (q *queueStore) saveVolumeWorker() {
for q.processNextWorkItem() {
func (q *queueStore) saveVolumeWorker(ctx context.Context) {
for q.processNextWorkItem(ctx) {
}
}

func (q *queueStore) processNextWorkItem() bool {
func (q *queueStore) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := q.queue.Get()
defer q.queue.Done(obj)

Expand Down Expand Up @@ -139,32 +140,33 @@ func (q *queueStore) processNextWorkItem() bool {
return true
}

if err := q.doSaveVolume(volume); err != nil {
if err := q.doSaveVolume(ctx, volume); err != nil {
q.queue.AddRateLimited(volumeName)
utilruntime.HandleError(err)
klog.V(5).Infof("Volume %s enqueued", volume.Name)
klog.FromContext(ctx).V(5).Info("Volume enqueued", "volume", volume.Name)
return true
}
q.volumes.Delete(volumeName)
q.queue.Forget(volumeName)
return true
}

func (q *queueStore) doSaveVolume(volume *v1.PersistentVolume) error {
klog.V(5).Infof("Saving volume %s", volume.Name)
func (q *queueStore) doSaveVolume(ctx context.Context, volume *v1.PersistentVolume) error {
logger := klog.FromContext(ctx)
logger.V(5).Info("Saving volume", "volume", volume.Name)
_, err := q.client.CoreV1().PersistentVolumes().Create(context.Background(), volume, metav1.CreateOptions{})
if err == nil || apierrs.IsAlreadyExists(err) {
klog.V(5).Infof("Volume %s saved", volume.Name)
q.sendSuccessEvent(volume)
logger.V(5).Info("Volume saved", "volume", volume.Name)
q.sendSuccessEvent(ctx, volume)
return nil
}
return fmt.Errorf("error saving volume %s: %s", volume.Name, err)
}

func (q *queueStore) sendSuccessEvent(volume *v1.PersistentVolume) {
func (q *queueStore) sendSuccessEvent(ctx context.Context, volume *v1.PersistentVolume) {
claimObjs, err := q.claimsIndexer.ByIndex(uidIndex, string(volume.Spec.ClaimRef.UID))
if err != nil {
klog.V(2).Infof("Error sending event to claim %s: %s", volume.Spec.ClaimRef.UID, err)
klog.FromContext(ctx).V(2).Info("Error sending event to claim", "claimUID", volume.Spec.ClaimRef.UID, "err", err)
return
}
if len(claimObjs) != 1 {
Expand Down Expand Up @@ -205,23 +207,24 @@ func NewBackoffStore(client kubernetes.Interface,
}
}

func (b *backoffStore) StoreVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error {
func (b *backoffStore) StoreVolume(ctx context.Context, claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error {
// Try to create the PV object several times
logger := klog.FromContext(ctx)
var lastSaveError error
err := wait.ExponentialBackoff(*b.backoff, func() (bool, error) {
klog.Infof("Trying to save persistentvolume %q", volume.Name)
logger.Info("Trying to save persistentvolume", "persistentvolume", volume.Name)
var err error
if _, err = b.client.CoreV1().PersistentVolumes().Create(context.Background(), volume, metav1.CreateOptions{}); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
klog.Infof("persistentvolume %q already exists, reusing", volume.Name)
logger.Info("Persistentvolume already exists, reusing", "persistentvolume", volume.Name)
} else {
klog.Infof("persistentvolume %q saved", volume.Name)
logger.Info("Persistentvolume saved", "persistentvolume", volume.Name)
}
return true, nil
}
// Save failed, try again after a while.
klog.Infof("Failed to save persistentvolume %q: %v", volume.Name, err)
logger.Info("Failed to save persistentvolume", "persistentvolume", volume.Name, "err", err)
lastSaveError = err
return false, nil
})
Expand All @@ -237,27 +240,27 @@ func (b *backoffStore) StoreVolume(claim *v1.PersistentVolumeClaim, volume *v1.P
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError)
klog.Error(strerr)
logger.Error(lastSaveError, "Error creating provisioned PV object for claim. Deleting the volume.", "claim", klog.KObj(claim))
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", klog.KObj(claim), lastSaveError)
b.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr)

var lastDeleteError error
err = wait.ExponentialBackoff(*b.backoff, func() (bool, error) {
if err = b.ctrl.provisioner.Delete(context.Background(), volume); err == nil {
// Delete succeeded
klog.Infof("Cleaning volume %q succeeded", volume.Name)
logger.Info("Cleaning volume succeeded", "volume", volume.Name)
return true, nil
}
// Delete failed, try again after a while.
klog.Infof("Failed to clean volume %q: %v", volume.Name, err)
logger.Info("Failed to clean volume", "volume", volume.Name, "err", err)
lastDeleteError = err
return false, nil
})
if err != nil {
// Delete failed several times. There is an orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError)
klog.Error(strerr)
logger.Error(lastSaveError, "Error cleaning provisioned volume for claim. Please delete manually.", "claim", klog.KObj(claim))
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", klog.KObj(claim), lastDeleteError)
b.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
}

Expand Down
8 changes: 4 additions & 4 deletions examples/hostpath-provisioner/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ module sigs.k8s.io/kubernetes-sigs/sig-storage-lib-external-provisioner/examples
go 1.16

require (
k8s.io/api v0.19.1
k8s.io/apimachinery v0.19.1
k8s.io/client-go v0.19.1
k8s.io/klog/v2 v2.3.0
k8s.io/api v0.28.0
k8s.io/apimachinery v0.28.0
k8s.io/client-go v0.28.0
k8s.io/klog/v2 v2.100.1
sigs.k8s.io/sig-storage-lib-external-provisioner/v9 v9.0.1
)

Expand Down
Loading

0 comments on commit 7abe4a2

Please sign in to comment.