@@ -13,6 +13,7 @@ import (
13
13
"google.golang.org/grpc/status"
14
14
batchv1 "k8s.io/api/batch/v1"
15
15
corev1 "k8s.io/api/core/v1"
16
+ "k8s.io/apimachinery/pkg/api/errors"
16
17
k8sresource "k8s.io/apimachinery/pkg/api/resource"
17
18
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
19
"k8s.io/utils/pointer"
@@ -143,13 +144,59 @@ func (p *TaskLauncher) StopTask(
143
144
log hclog.Logger ,
144
145
ti * TaskInfo ,
145
146
) error {
146
- // Purposely do nothing. We leverage the job TTL feature in Kube 1.19+
147
- // so that Kubernetes automatically deletes old jobs after they complete
148
- // running.
147
+ // If a job completes and the coresponding pod exits with a "completed"
148
+ // status, we urposely do nothing here. We leverage the job TTL feature in
149
+ // Kube 1.19+ so that Kubernetes automatically deletes old jobs and pods
150
+ // after they complete running.
149
151
//
150
- // In the future, we may want to get more clever about this and explicitly
151
- // delete jobs under certain conditions, but for now we leave them around
152
- // and let K8S clean it up
152
+ // If a Waypoint job is cancelled or otherwise times out, we check for
153
+ // existing Kubernetes jobs and delete them, and clean up any Pending
154
+ // containers.
155
+ clientSet , ns , _ , err := Clientset (p .config .KubeconfigPath , p .config .Context )
156
+ if err != nil {
157
+ return err
158
+ }
159
+ if p .config .Namespace != "" {
160
+ ns = p .config .Namespace
161
+ }
162
+
163
+ // Delete the job. This does *not* delete any running pods that the job
164
+ // created.
165
+ jobsClient := clientSet .BatchV1 ().Jobs (ns )
166
+ if err := jobsClient .Delete (ctx , ti .Id , metav1.DeleteOptions {}); err != nil {
167
+ if ! errors .IsNotFound (err ) {
168
+ return err
169
+ }
170
+ }
171
+
172
+ // List pods with this job label
173
+ podsClient := clientSet .CoreV1 ().Pods (ns )
174
+ pods , err := podsClient .List (ctx , metav1.ListOptions {
175
+ LabelSelector : fmt .Sprintf ("job-name=%s" , ti .Id ),
176
+ })
177
+ // It's not clear from the documentation if an error is returned from the
178
+ // List API call if no jobs are found, so we guard here just in case
179
+ if err != nil && ! errors .IsNotFound (err ) {
180
+ return err
181
+ }
182
+
183
+ if pods == nil {
184
+ log .Info ("no pods found for job, returning" , "job_id" , ti .Id )
185
+ return nil
186
+ }
187
+
188
+ // Delete any pods stuck in pending
189
+ for _ , p := range pods .Items {
190
+ if p .Status .Phase == corev1 .PodPending {
191
+ log .Warn ("job pod is in pending phase in StopTask operation, cancelling" , "job_id" , ti .Id )
192
+ if err := podsClient .Delete (ctx , p .Name , metav1.DeleteOptions {}); err != nil {
193
+ if ! errors .IsNotFound (err ) {
194
+ return err
195
+ }
196
+ }
197
+ }
198
+ }
199
+
153
200
return nil
154
201
}
155
202
@@ -205,8 +252,8 @@ func (p *TaskLauncher) StartTask(
205
252
}
206
253
207
254
// Get container resource limits and requests
208
- var resourceLimits = make (map [corev1.ResourceName ]k8sresource.Quantity )
209
- var resourceRequests = make (map [corev1.ResourceName ]k8sresource.Quantity )
255
+ resourceLimits : = make (map [corev1.ResourceName ]k8sresource.Quantity )
256
+ resourceRequests : = make (map [corev1.ResourceName ]k8sresource.Quantity )
210
257
resourceRequirements := corev1.ResourceRequirements {
211
258
Limits : resourceLimits ,
212
259
Requests : resourceRequests ,
0 commit comments