diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 4b3e2ed332cb..dfee85c59bcf 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -26,6 +26,7 @@ Note that these environment variables may be removed at any time. | `LEADER_ELECTION_RETRY_PERIOD` | `time.Duration` | `5s` | The duration that the leader election clients should wait between tries of actions. | | `MAX_OPERATION_TIME` | `time.Duration` | `30s` | The maximum time a workflow operation is allowed to run for before requeuing the workflow onto the work queue. | | `OFFLOAD_NODE_STATUS_TTL` | `time.Duration` | `5m` | The TTL to delete the offloaded node status. Currently only used for testing. | +| `POD_NAMES` | `string` | `v2` | Whether to have pod names contain the template name (v2) or be the node id (v1). | | `RECENTLY_STARTED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently started. | | `RETRY_BACKOFF_DURATION` | `time.Duration` | `10ms` | The retry backoff duration when retrying API calls. | | `RETRY_BACKOFF_FACTOR` | `float` | `2.0` | The retry backoff factor when retrying API calls. | diff --git a/test/e2e/fixtures/then.go b/test/e2e/fixtures/then.go index b2a40c848e8d..38978171e997 100644 --- a/test/e2e/fixtures/then.go +++ b/test/e2e/fixtures/then.go @@ -18,6 +18,7 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/workflow/hydrator" + "github.com/argoproj/argo-workflows/v3/workflow/util" ) type Then struct { @@ -87,7 +88,8 @@ func (t *Then) ExpectWorkflowNode(selector func(status wfv1.NodeStatus) bool, f if n.Type == wfv1.NodeTypePod { var err error ctx := context.Background() - p, err = t.kubeClient.CoreV1().Pods(t.wf.Namespace).Get(ctx, n.ID, metav1.GetOptions{}) + podName := util.PodName(t.wf.Name, n.Name, n.TemplateName, n.ID) + p, err = t.kubeClient.CoreV1().Pods(t.wf.Namespace).Get(ctx, podName, metav1.GetOptions{}) if err != nil { if !apierr.IsNotFound(err) { t.t.Error(err) diff --git a/workflow/common/common.go b/workflow/common/common.go index 4a2d832deb23..1b12141a7498 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -15,6 +15,10 @@ const ( // DockerSockVolumeName is the volume name for the /var/run/docker.sock host path volume DockerSockVolumeName = "docker-sock" + // AnnotationKeyNodeID is the ID of the node. + // Historically, the pod name was the same as the node ID. + // Therefore, if it does not exist, then the node ID is the pod name. + AnnotationKeyNodeID = workflow.WorkflowFullName + "/node-id" // AnnotationKeyNodeName is the pod metadata annotation key containing the workflow node name AnnotationKeyNodeName = workflow.WorkflowFullName + "/node-name" // AnnotationKeyNodeName is the node's type diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 870fd5919b1d..306fd8e7798c 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1029,6 +1029,7 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) cache.SharedI source := wfc.newWorkflowPodWatch(ctx) informer := cache.NewSharedIndexInformer(source, &apiv1.Pod{}, podResyncPeriod, cache.Indexers{ indexes.WorkflowIndex: indexes.MetaWorkflowIndexFunc, + indexes.NodeIDIndex: indexes.MetaNodeIDIndexFunc, indexes.PodPhaseIndex: indexes.PodPhaseIndexFunc, }) informer.AddEventHandler( diff --git a/workflow/controller/exec_control.go b/workflow/controller/exec_control.go index 961faf16c6e3..338de07d49e4 100644 --- a/workflow/controller/exec_control.go +++ b/workflow/controller/exec_control.go @@ -19,6 +19,9 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1 if pod == nil { return } + + nodeID := woc.nodeID(pod) + switch pod.Status.Phase { case apiv1.PodSucceeded, apiv1.PodFailed: // Skip any pod which are already completed @@ -33,10 +36,8 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1 woc.log.Infof("Deleting Pending pod %s/%s as part of workflow shutdown with strategy: %s", pod.Namespace, pod.Name, woc.GetShutdownStrategy()) err := woc.controller.kubeclientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) if err == nil { - wfNodesLock.Lock() - node := woc.wf.Status.Nodes[pod.Name] - wfNodesLock.Unlock() - woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy())) + msg := fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy()) + woc.handleExecutionControlError(nodeID, wfNodesLock, msg) return } // If we fail to delete the pod, fall back to setting the annotation @@ -52,10 +53,7 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1 woc.log.Infof("Deleting Pending pod %s/%s which has exceeded workflow deadline %s", pod.Namespace, pod.Name, woc.workflowDeadline) err := woc.controller.kubeclientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) if err == nil { - wfNodesLock.Lock() - node := woc.wf.Status.Nodes[pod.Name] - wfNodesLock.Unlock() - woc.markNodePhase(node.Name, wfv1.NodeFailed, "Step exceeded its deadline") + woc.handleExecutionControlError(nodeID, wfNodesLock, "Step exceeded its deadline") return } // If we fail to delete the pod, fall back to setting the annotation @@ -71,6 +69,14 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1 } } +// handleExecutionControlError marks a node as failed with an error message +func (woc *wfOperationCtx) handleExecutionControlError(nodeID string, wfNodesLock *sync.RWMutex, errorMsg string) { + wfNodesLock.Lock() + node := woc.wf.Status.Nodes[nodeID] + wfNodesLock.Unlock() + woc.markNodePhase(node.Name, wfv1.NodeFailed, errorMsg) +} + // killDaemonedChildren kill any daemoned pods of a steps or DAG template node. func (woc *wfOperationCtx) killDaemonedChildren(nodeID string) { woc.log.Infof("Checking daemoned children of %s", nodeID) diff --git a/workflow/controller/indexes/indexes.go b/workflow/controller/indexes/indexes.go index 06a444bda1de..8d484288c788 100644 --- a/workflow/controller/indexes/indexes.go +++ b/workflow/controller/indexes/indexes.go @@ -9,6 +9,7 @@ package indexes const ( ClusterWorkflowTemplateIndex = "clusterworkflowtemplate" CronWorkflowIndex = "cronworkflow" + NodeIDIndex = "nodeID" WorkflowIndex = "workflow" WorkflowTemplateIndex = "workflowtemplate" WorkflowPhaseIndex = "workflow.phase" diff --git a/workflow/controller/indexes/workflow_index.go b/workflow/controller/indexes/workflow_index.go index 9fb19df1e091..e60278fdb9ea 100644 --- a/workflow/controller/indexes/workflow_index.go +++ b/workflow/controller/indexes/workflow_index.go @@ -32,6 +32,21 @@ func MetaWorkflowIndexFunc(obj interface{}) ([]string, error) { return []string{WorkflowIndexValue(m.GetNamespace(), name)}, nil } +// MetaNodeIDIndexFunc takes a kubernetes object and returns either the +// namespace and its node id or the namespace and its name +func MetaNodeIDIndexFunc(obj interface{}) ([]string, error) { + m, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + + if nodeID, ok := m.GetAnnotations()[common.AnnotationKeyNodeID]; ok { + return []string{m.GetNamespace() + "/" + nodeID}, nil + } + + return []string{m.GetNamespace() + "/" + m.GetName()}, nil +} + func WorkflowIndexValue(namespace, name string) string { return namespace + "/" + name } diff --git a/workflow/controller/indexes/workflow_index_test.go b/workflow/controller/indexes/workflow_index_test.go index d757c4ba526c..446799bc592c 100644 --- a/workflow/controller/indexes/workflow_index_test.go +++ b/workflow/controller/indexes/workflow_index_test.go @@ -25,6 +25,43 @@ metadata: } } +func TestMetaNodeIDIndexFunc(t *testing.T) { + withNodeID := ` +apiVersion: v1 +kind: Pod +metadata: + namespace: my-ns + name: retry-test-p7jzr-whalesay-2308805457 + labels: + workflows.argoproj.io/workflow: my-wf + annotations: + workflows.argoproj.io/node-id: retry-test-p7jzr-2308805457 + workflows.argoproj.io/node-name: 'retry-test-p7jzr[0].steps-outer-step1' +` + withoutNodeID := ` +apiVersion: v1 +kind: Pod +metadata: + namespace: my-ns + name: retry-test-p7jzr-whalesay-2308805457 + labels: + workflows.argoproj.io/workflow: my-wf + annotations: + workflows.argoproj.io/node-name: 'retry-test-p7jzr[0].steps-outer-step1' +` + obj := &unstructured.Unstructured{} + wfv1.MustUnmarshal(withNodeID, obj) + v, err := MetaNodeIDIndexFunc(obj) + assert.NoError(t, err) + assert.Equal(t, []string{"my-ns/retry-test-p7jzr-2308805457"}, v) + + obj = &unstructured.Unstructured{} + wfv1.MustUnmarshal(withoutNodeID, obj) + v, err = MetaNodeIDIndexFunc(obj) + assert.NoError(t, err) + assert.Equal(t, []string{"my-ns/retry-test-p7jzr-whalesay-2308805457"}, v) +} + func TestWorkflowIndexValue(t *testing.T) { assert.Equal(t, "my-ns/my-wf", WorkflowIndexValue("my-ns", "my-wf")) } diff --git a/workflow/controller/node_counters.go b/workflow/controller/node_counters.go index 62b25375ff72..36faf247a591 100644 --- a/workflow/controller/node_counters.go +++ b/workflow/controller/node_counters.go @@ -1,8 +1,6 @@ package controller import ( - "fmt" - wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) @@ -55,7 +53,7 @@ func (woc *wfOperationCtx) getUnsuccessfulChildren(boundaryID string) int64 { } func (woc *wfOperationCtx) nodePodExist(node wfv1.NodeStatus) bool { - _, podExist, _ := woc.controller.podInformer.GetIndexer().GetByKey(fmt.Sprintf("%s/%s", woc.wf.Namespace, node.ID)) + _, podExist, _ := woc.podExists(node.ID) return podExist } diff --git a/workflow/controller/node_counters_test.go b/workflow/controller/node_counters_test.go index d32b73577f9f..b2b3a13db639 100644 --- a/workflow/controller/node_counters_test.go +++ b/workflow/controller/node_counters_test.go @@ -147,4 +147,17 @@ func TestCounters(t *testing.T) { assert.Equal(t, int64(2), woc.getActivePods("2")) assert.Equal(t, int64(2), woc.getActiveChildren("2")) assert.Equal(t, int64(2), woc.getUnsuccessfulChildren("2")) + + testNodePodExists(t, woc) +} + +func testNodePodExists(t *testing.T, woc *wfOperationCtx) { + for _, node := range woc.wf.Status.Nodes { + if node.ID == "" { + continue + } + + doesPodExist := woc.nodePodExist(node) + assert.True(t, doesPodExist) + } } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 78f7194f76f0..a846911cc6b6 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -939,9 +939,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { woc.updateAgentPodStatus(ctx, pod) return } - nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName] - - nodeID := woc.wf.NodeID(nodeNameForPod) + nodeID := woc.nodeID(pod) seenPodLock.Lock() seenPods[nodeID] = pod seenPodLock.Unlock() @@ -967,7 +965,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { } woc.updated = true } - node := woc.wf.Status.Nodes[pod.ObjectMeta.Name] + node := woc.wf.Status.Nodes[nodeID] match := true if woc.execWf.Spec.PodGC.GetLabelSelector() != nil { var podLabels labels.Set = pod.GetLabels() @@ -1032,7 +1030,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { // grace-period to allow informer sync recentlyStarted := recentlyStarted(node) - woc.log.WithFields(log.Fields{"podName": node.Name, "nodePhase": node.Phase, "recentlyStarted": recentlyStarted}).Info("Workflow pod is missing") + woc.log.WithFields(log.Fields{"nodeName": node.Name, "nodePhase": node.Phase, "recentlyStarted": recentlyStarted}).Info("Workflow pod is missing") metrics.PodMissingMetric.WithLabelValues(strconv.FormatBool(recentlyStarted), string(node.Phase)).Inc() // If the node is pending and the pod does not exist, it could be the case that we want to try to submit it @@ -1059,6 +1057,14 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { return nil } +func (woc *wfOperationCtx) nodeID(pod *apiv1.Pod) string { + nodeID, ok := pod.Annotations[common.AnnotationKeyNodeID] + if !ok { + nodeID = woc.wf.NodeID(pod.Annotations[common.AnnotationKeyNodeName]) + } + return nodeID +} + func recentlyStarted(node wfv1.NodeStatus) bool { return time.Since(node.StartedAt.Time) <= envutil.LookupEnvDurationOr("RECENTLY_STARTED_POD_DURATION", 10*time.Second) } diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 6bbc80ed5747..edc5857a63ae 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -1549,36 +1549,25 @@ spec: // TestWorkflowParallelismLimit verifies parallelism at a workflow level is honored. func TestWorkflowParallelismLimit(t *testing.T) { - cancel, controller := newController() - defer cancel() - ctx := context.Background() - wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("default") wf := wfv1.MustUnmarshalWorkflow(workflowParallelismLimit) - wf, err := wfcset.Create(ctx, wf, metav1.CreateOptions{}) - assert.NoError(t, err) + cancel, controller := newController(wf) + defer cancel() - wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{}) - assert.NoError(t, err) woc := newWorkflowOperationCtx(wf, controller) woc.operate(ctx) pods, err := listPods(woc) assert.NoError(t, err) - assert.Equal(t, 2, len(pods.Items)) - // operate again and make sure we don't schedule any more pods - makePodsPhase(ctx, woc, apiv1.PodRunning) + assert.Len(t, pods.Items, 2) - syncPodsInformer(ctx, woc) + makePodsPhase(ctx, woc, apiv1.PodRunning) - wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{}) - assert.NoError(t, err) - // wfBytes, _ := json.MarshalIndent(wf, "", " ") - // log.Printf("%s", wfBytes) - woc = newWorkflowOperationCtx(wf, controller) + // operate again and make sure we don't schedule any more pods + woc = newWorkflowOperationCtx(woc.wf, controller) woc.operate(ctx) pods, err = listPods(woc) assert.NoError(t, err) - assert.Equal(t, 2, len(pods.Items)) + assert.Len(t, pods.Items, 2) } var stepsTemplateParallelismLimit = ` diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 34d432139cde..9ec6a57e2ba6 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -9,12 +9,13 @@ import ( "strconv" "time" + "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" + log "github.com/sirupsen/logrus" apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/client-go/tools/cache" "k8s.io/utils/pointer" "github.com/argoproj/argo-workflows/v3/config" @@ -142,16 +143,14 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin // we must check to see if the pod exists rather than just optimistically creating the pod and see if we get // an `AlreadyExists` error because we won't get that error if there is not enough resources. // Performance enhancement: Code later in this func is expensive to execute, so return quickly if we can. - obj, exists, err := woc.controller.podInformer.GetStore().Get(cache.ExplicitKey(woc.wf.Namespace + "/" + nodeID)) + existing, exists, err := woc.podExists(nodeID) if err != nil { - return nil, fmt.Errorf("failed to get pod from informer store: %w", err) + return nil, err } + if exists { - existing, ok := obj.(*apiv1.Pod) - if ok { - woc.log.WithField("podPhase", existing.Status.Phase).Debugf("Skipped pod %s (%s) creation: already exists", nodeName, nodeID) - return existing, nil - } + woc.log.WithField("podPhase", existing.Status.Phase).Debugf("Skipped pod %s (%s) creation: already exists", nodeName, nodeID) + return existing, nil } if !woc.GetShutdownStrategy().ShouldExecute(opts.onExitPod) { @@ -205,10 +204,9 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin activeDeadlineSeconds = tmplActiveDeadlineSeconds } } - pod := &apiv1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: nodeID, + Name: util.PodName(woc.wf.Name, nodeName, tmpl.Name, nodeID), Namespace: woc.wf.ObjectMeta.Namespace, Labels: map[string]string{ common.LabelKeyWorkflow: woc.wf.ObjectMeta.Name, // Allows filtering by pods related to specific workflow @@ -216,6 +214,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin }, Annotations: map[string]string{ common.AnnotationKeyNodeName: nodeName, + common.AnnotationKeyNodeID: nodeID, }, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(woc.wf, wfv1.SchemeGroupVersion.WithKind(workflow.WorkflowKind)), @@ -437,20 +436,20 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin return nil, ErrResourceRateLimitReached } - woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID) + woc.log.Debugf("Creating Pod: %s (%s)", nodeName, pod.Name) created, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { if apierr.IsAlreadyExists(err) { // workflow pod names are deterministic. We can get here if the // controller fails to persist the workflow after creating the pod. - woc.log.Infof("Failed pod %s (%s) creation: already exists", nodeName, nodeID) + woc.log.Infof("Failed pod %s (%s) creation: already exists", nodeName, pod.Name) return created, nil } if errorsutil.IsTransientErr(err) { return nil, err } - woc.log.Infof("Failed to create pod %s (%s): %v", nodeName, nodeID, err) + woc.log.Infof("Failed to create pod %s (%s): %v", nodeName, pod.Name, err) return nil, errors.InternalWrapError(err) } woc.log.Infof("Created pod: %s (%s)", nodeName, created.Name) @@ -458,6 +457,29 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin return created, nil } +func (woc *wfOperationCtx) podExists(nodeID string) (existing *apiv1.Pod, exists bool, err error) { + objs, err := woc.controller.podInformer.GetIndexer().ByIndex(indexes.NodeIDIndex, woc.wf.Namespace+"/"+nodeID) + if err != nil { + return nil, false, fmt.Errorf("failed to get pod from informer store: %w", err) + } + + objectCount := len(objs) + + if objectCount == 0 { + return nil, false, nil + } + + if objectCount > 1 { + return nil, false, fmt.Errorf("expected < 2 pods, got %d - this is a bug", len(objs)) + } + + if existing, ok := objs[0].(*apiv1.Pod); ok { + return existing, true, nil + } + + return nil, false, nil +} + func (woc *wfOperationCtx) getDeadline(opts *createWorkflowPodOpts) *time.Time { deadline := time.Time{} if woc.workflowDeadline != nil { diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index 4740fcd7d922..82b26fc4610e 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -1569,3 +1569,28 @@ func TestPodMetadataWithWorkflowDefaults(t *testing.T) { assert.Equal(t, "label-value", pod.ObjectMeta.Labels["controller-level-pod-label"]) cancel() } + +func TestPodExists(t *testing.T) { + cancel, controller := newController() + defer cancel() + + wf := wfv1.MustUnmarshalWorkflow(helloWorldWf) + ctx := context.Background() + woc := newWorkflowOperationCtx(wf, controller) + err := woc.setExecWorkflow(ctx) + assert.NoError(t, err) + mainCtr := woc.execWf.Spec.Templates[0].Container + pod, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}) + assert.NoError(t, err) + assert.NotNil(t, pod) + + pods, err := listPods(woc) + assert.NoError(t, err) + assert.Len(t, pods.Items, 1) + + existingPod, doesExist, err := woc.podExists(pod.Name) + assert.NoError(t, err) + assert.NotNil(t, existingPod) + assert.True(t, doesExist) + assert.EqualValues(t, pod, existingPod) +} diff --git a/workflow/util/pod_name.go b/workflow/util/pod_name.go new file mode 100644 index 000000000000..09f4c1079960 --- /dev/null +++ b/workflow/util/pod_name.go @@ -0,0 +1,40 @@ +package util + +import ( + "fmt" + "hash/fnv" + "os" +) + +const ( + maxK8sResourceNameLength = 253 + k8sNamingHashLength = 10 +) + +// PodName return a deterministic pod name +func PodName(workflowName, nodeName, templateName, nodeID string) string { + if os.Getenv("POD_NAMES") == "v1" { + return nodeID + } + + if workflowName == nodeName { + return workflowName + } + + prefix := fmt.Sprintf("%s-%s", workflowName, templateName) + prefix = ensurePodNamePrefixLength(prefix) + + h := fnv.New32a() + _, _ = h.Write([]byte(nodeName)) + return fmt.Sprintf("%s-%v", prefix, h.Sum32()) +} + +func ensurePodNamePrefixLength(prefix string) string { + maxPrefixLength := maxK8sResourceNameLength - k8sNamingHashLength + + if len(prefix) > maxPrefixLength-1 { + return prefix[0 : maxPrefixLength-1] + } + + return prefix +} diff --git a/workflow/util/pod_name_test.go b/workflow/util/pod_name_test.go new file mode 100644 index 000000000000..cb6e4f93dcbf --- /dev/null +++ b/workflow/util/pod_name_test.go @@ -0,0 +1,39 @@ +package util + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPodName(t *testing.T) { + nodeName := "nodename" + nodeID := "1" + + // short case + shortWfName := "wfname" + shortTemplateName := "templatename" + + expected := fmt.Sprintf("%s-%s", shortWfName, shortTemplateName) + actual := ensurePodNamePrefixLength(expected) + assert.Equal(t, expected, actual) + + name := PodName(shortWfName, nodeName, shortTemplateName, nodeID) + assert.Equal(t, "wfname-templatename-1454367246", name) + + // long case + longWfName := "alongworkflownamethatincludeslotsofdetailsandisessentiallyalargerunonsentencewithpoorstyleandnopunctuationtobehadwhatsoever" + longTemplateName := "alongtemplatenamethatincludessliightlymoredetailsandiscertainlyalargerunonstnencewithevenworsestylisticconcernsandpreposterouslyeliminatespunctuation" + + sum := len(longWfName) + len(longTemplateName) + assert.Greater(t, sum, maxK8sResourceNameLength-k8sNamingHashLength) + + expected = fmt.Sprintf("%s-%s", longWfName, longTemplateName) + actual = ensurePodNamePrefixLength(expected) + + assert.Equal(t, maxK8sResourceNameLength-k8sNamingHashLength-1, len(actual)) + + name = PodName(longWfName, nodeName, longTemplateName, nodeID) + assert.Equal(t, maxK8sResourceNameLength, len(name)) +} diff --git a/workflow/util/util.go b/workflow/util/util.go index 8c7fd9a44545..ddfb180b1d3d 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -804,7 +804,8 @@ func retryWorkflow(ctx context.Context, kubeClient kubernetes.Interface, hydrato } if node.Type == wfv1.NodeTypePod { log.Infof("Deleting pod: %s", node.ID) - err := podIf.Delete(ctx, node.ID, metav1.DeleteOptions{}) + podName := PodName(wf.Name, node.Name, node.TemplateName, node.ID) + err := podIf.Delete(ctx, podName, metav1.DeleteOptions{}) if err != nil && !apierr.IsNotFound(err) { return nil, errors.InternalWrapError(err) }