Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,22 +1026,9 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.

// Remove finalizers from Pods if they exist before deletion
pods := wfc.kubeclientset.CoreV1().Pods(wfc.GetManagedNamespace())
podList, err := pods.List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", common.LabelKeyWorkflow, obj.(*unstructured.Unstructured).GetName()),
})
if err != nil {
logger.WithError(err).Error(ctx, "Failed to list pods")
}
for _, p := range podList.Items {
if slices.Contains(p.Finalizers, common.FinalizerPodStatus) {
wfc.PodController.RemoveFinalizer(ctx, p.Namespace, p.Name)
}
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
wfc.removeAllPodFinalizer(ctx, obj)
wfc.releaseAllWorkflowLocks(ctx, obj)
wfc.recordCompletedWorkflow(key)
// no need to add to the queue - this workflow is done
Expand Down Expand Up @@ -1293,6 +1280,25 @@ func (wfc *WorkflowController) getMetricsServerConfig() *telemetry.MetricsConfig
return &metricsConfig
}

func (wfc *WorkflowController) removeAllPodFinalizer(ctx context.Context, obj any) {
un, ok := obj.(*unstructured.Unstructured)
logger := logging.RequireLoggerFromContext(ctx)
if !ok {
logger.WithField("key", obj).Warn(ctx, "Key in index is not an unstructured")
return
}
wf, err := util.FromUnstructured(un)
if err != nil {
logger.WithField("key", obj).Warn(ctx, "Invalid workflow object")
return
}
for _, node := range wf.Status.Nodes {
if node.Type == wfv1.NodeTypePod {
wfc.PodController.RemoveFinalizer(ctx, wf.Namespace, node.ID)
}
}
}

func (wfc *WorkflowController) releaseAllWorkflowLocks(ctx context.Context, obj any) {
un, ok := obj.(*unstructured.Unstructured)
logger := logging.RequireLoggerFromContext(ctx)
Expand Down
Loading