diff --git a/.changelog/3143.txt b/.changelog/3143.txt new file mode 100644 index 00000000000..e112481a8ec --- /dev/null +++ b/.changelog/3143.txt @@ -0,0 +1,3 @@ +```release-note:bug +plugin/k8s: clean up pending pods from cancelled jobs +``` diff --git a/builtin/k8s/platform.go b/builtin/k8s/platform.go index dd325776876..92512c18d2e 100644 --- a/builtin/k8s/platform.go +++ b/builtin/k8s/platform.go @@ -480,7 +480,7 @@ func configureContainer( } resourceRequests[resourceName] = q } else { - log.Warn("ignoring unrecognized k8s resources key: %q", k) + log.Warn("ignoring unrecognized k8s resources key", "key", k) } } @@ -622,7 +622,7 @@ func (p *Platform) resourceDeploymentCreate( // App container must have some kind of port if len(appContainerSpec.Ports) == 0 { - log.Warn("No ports defined in waypoint.hcl - defaulting to http on port %d", DefaultServicePort) + log.Warn("No ports defined in waypoint.hcl - defaulting to http on port", "port", DefaultServicePort) appContainerSpec.Ports = append(appContainerSpec.Ports, &Port{Port: DefaultServicePort, Name: "http"}) } diff --git a/builtin/k8s/task.go b/builtin/k8s/task.go index 7ae6bfa641f..bd0a3b087c0 100644 --- a/builtin/k8s/task.go +++ b/builtin/k8s/task.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc/status" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" k8sresource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" @@ -143,13 +144,59 @@ func (p *TaskLauncher) StopTask( log hclog.Logger, ti *TaskInfo, ) error { - // Purposely do nothing. We leverage the job TTL feature in Kube 1.19+ - // so that Kubernetes automatically deletes old jobs after they complete - // running. + // If a job completes and the coresponding pod exits with a "completed" + // status, we urposely do nothing here. We leverage the job TTL feature in + // Kube 1.19+ so that Kubernetes automatically deletes old jobs and pods + // after they complete running. // - // In the future, we may want to get more clever about this and explicitly - // delete jobs under certain conditions, but for now we leave them around - // and let K8S clean it up + // If a Waypoint job is cancelled or otherwise times out, we check for + // existing Kubernetes jobs and delete them, and clean up any Pending + // containers. + clientSet, ns, _, err := Clientset(p.config.KubeconfigPath, p.config.Context) + if err != nil { + return err + } + if p.config.Namespace != "" { + ns = p.config.Namespace + } + + // Delete the job. This does *not* delete any running pods that the job + // created. + jobsClient := clientSet.BatchV1().Jobs(ns) + if err := jobsClient.Delete(ctx, ti.Id, metav1.DeleteOptions{}); err != nil { + if !errors.IsNotFound(err) { + return err + } + } + + // List pods with this job label + podsClient := clientSet.CoreV1().Pods(ns) + pods, err := podsClient.List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("job-name=%s", ti.Id), + }) + // It's not clear from the documentation if an error is returned from the + // List API call if no jobs are found, so we guard here just in case + if err != nil && !errors.IsNotFound(err) { + return err + } + + if pods == nil { + log.Info("no pods found for job, returning", "job_id", ti.Id) + return nil + } + + // Delete any pods stuck in pending + for _, p := range pods.Items { + if p.Status.Phase == corev1.PodPending { + log.Warn("job pod is in pending phase in StopTask operation, cancelling", "job_id", ti.Id) + if err := podsClient.Delete(ctx, p.Name, metav1.DeleteOptions{}); err != nil { + if !errors.IsNotFound(err) { + return err + } + } + } + } + return nil } @@ -205,8 +252,8 @@ func (p *TaskLauncher) StartTask( } // Get container resource limits and requests - var resourceLimits = make(map[corev1.ResourceName]k8sresource.Quantity) - var resourceRequests = make(map[corev1.ResourceName]k8sresource.Quantity) + resourceLimits := make(map[corev1.ResourceName]k8sresource.Quantity) + resourceRequests := make(map[corev1.ResourceName]k8sresource.Quantity) resourceRequirements := corev1.ResourceRequirements{ Limits: resourceLimits, Requests: resourceRequests,