Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 208 additions & 70 deletions CHANGELOG/CHANGELOG-1.32.md

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions cmd/kubeadm/app/util/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,15 @@ func (c *Client) MemberPromote(learnerID uint64) error {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()

isLearner, err := c.isLearner(learnerID)
if err != nil {
return false, err
}
if !isLearner {
klog.V(1).Infof("[etcd] Member %s was already promoted.", strconv.FormatUint(learnerID, 16))
return true, nil
}

_, err = cli.MemberPromote(ctx, learnerID)
if err == nil {
klog.V(1).Infof("[etcd] The learner was promoted as a voting member: %s", strconv.FormatUint(learnerID, 16))
Expand Down
2 changes: 1 addition & 1 deletion openshift-hack/images/hyperkube/Dockerfile.rhel
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ COPY --from=builder /tmp/build/* /usr/bin/
LABEL io.k8s.display-name="OpenShift Kubernetes Server Commands" \
io.k8s.description="OpenShift is a platform for developing, building, and deploying containerized applications." \
io.openshift.tags="openshift,hyperkube" \
io.openshift.build.versions="kubernetes=1.32.6"
io.openshift.build.versions="kubernetes=1.32.7"
6 changes: 6 additions & 0 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,12 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
}
}
jm.enqueueLabelSelector(jobObj)

key := cache.MetaObjectToName(jobObj).String()
err := jm.podBackoffStore.removeBackoffRecord(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error removing backoff record %w", err))
}
}

func (jm *Controller) enqueueLabelSelector(jobObj *batch.Job) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kubelet/images/image_gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,10 @@ func (im *realImageGCManager) freeImage(ctx context.Context, image evictionInfo,
if isRuntimeClassInImageCriAPIEnabled {
imageKey = getImageTuple(image.id, image.runtimeHandlerUsedToPullImage)
}

im.imageRecordsLock.Lock()
delete(im.imageRecords, imageKey)
im.imageRecordsLock.Unlock()

metrics.ImageGarbageCollectedTotal.WithLabelValues(reason).Inc()
return err
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/batch/job/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ func getStatusValidationOptions(newJob, oldJob *batch.Job) batchvalidation.JobSt
isUncountedTerminatedPodsChanged := !apiequality.Semantic.DeepEqual(oldJob.Status.UncountedTerminatedPods, newJob.Status.UncountedTerminatedPods)
isReadyChanged := !ptr.Equal(oldJob.Status.Ready, newJob.Status.Ready)
isTerminatingChanged := !ptr.Equal(oldJob.Status.Terminating, newJob.Status.Terminating)
isSuspendedWithZeroCompletions := ptr.Equal(newJob.Spec.Suspend, ptr.To(true)) && ptr.Equal(newJob.Spec.Completions, ptr.To[int32](0))

return batchvalidation.JobStatusValidationOptions{
// We allow to decrease the counter for succeeded pods for jobs which
Expand All @@ -394,7 +395,7 @@ func getStatusValidationOptions(newJob, oldJob *batch.Job) batchvalidation.JobSt
RejectFailedJobWithoutFailureTarget: isJobFailedChanged || isFailedIndexesChanged,
RejectCompleteJobWithoutSuccessCriteriaMet: isJobCompleteChanged || isJobSuccessCriteriaMetChanged,
RejectFinishedJobWithActivePods: isJobFinishedChanged || isActiveChanged,
RejectFinishedJobWithoutStartTime: isJobFinishedChanged || isStartTimeChanged,
RejectFinishedJobWithoutStartTime: (isJobFinishedChanged || isStartTimeChanged) && !isSuspendedWithZeroCompletions,
RejectFinishedJobWithUncountedTerminatedPods: isJobFinishedChanged || isUncountedTerminatedPodsChanged,
RejectStartTimeUpdateForUnsuspendedJob: isStartTimeChanged,
RejectCompletionTimeBeforeStartTime: isStartTimeChanged || isCompletionTimeChanged,
Expand Down
30 changes: 30 additions & 0 deletions pkg/registry/batch/job/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3561,6 +3561,36 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) {
{Type: field.ErrorTypeInvalid, Field: "status.ready"},
},
},
"valid transition to Complete for suspended Job with completions=0; without startTime": {
enableJobManagedBy: true,
job: &batch.Job{
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Completions: ptr.To[int32](0),
Suspend: ptr.To(true),
},
},
newJob: &batch.Job{
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Completions: ptr.To[int32](0),
Suspend: ptr.To(true),
},
Status: batch.JobStatus{
CompletionTime: &now,
Conditions: []batch.JobCondition{
{
Type: batch.JobSuccessCriteriaMet,
Status: api.ConditionTrue,
},
{
Type: batch.JobComplete,
Status: api.ConditionTrue,
},
},
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
120 changes: 120 additions & 0 deletions test/integration/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2271,6 +2271,103 @@ func TestManagedBy_Reenabling(t *testing.T) {
})
}

// TestImmediateJobRecreation verifies that the replacement Job creates the Pods
// quickly after re-creation, see https://github.com/kubernetes/kubernetes/issues/132042.
func TestImmediateJobRecreation(t *testing.T) {
// set the backoff delay very high to make sure the test does not pass waiting long on asserts
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*wait.ForeverTestTimeout))
closeFn, restConfig, clientSet, ns := setup(t, "recreate-job-immediately")
t.Cleanup(closeFn)
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
t.Cleanup(cancel)

baseJob := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
},
Spec: batchv1.JobSpec{
Completions: ptr.To[int32](1),
Parallelism: ptr.To[int32](1),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "main-container",
Image: "foo",
},
},
},
},
},
}
jobSpec := func(idx int) batchv1.Job {
spec := baseJob.DeepCopy()
spec.Name = fmt.Sprintf("test-job-%d", idx)
return *spec
}

var jobObjs []*batchv1.Job
// We create multiple Jobs to make the repro more likely. In particular, we need
// more Jobs than the number of Job controller workers to make it very unlikely
// that syncJob executes (and cleans the in-memory state) before the corresponding
// replacement Jobs are created.
for i := 0; i < 3; i++ {
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, ptr.To(jobSpec(i)))
if err != nil {
t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj))
}
jobObjs = append(jobObjs, jobObj)
}

for _, jobObj := range jobObjs {
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
})

if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodFailed, klog.KObj(jobObj))
}

// Await to account for the failed Pod
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 1,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
})
}

for i := 0; i < len(jobObjs); i++ {
jobObj := jobObjs[i]
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
if err := jobClient.Delete(ctx, jobObj.Name, metav1.DeleteOptions{
// Use propagationPolicy=background so that we don't need to wait for the job object to be gone.
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
}); err != nil {
t.Fatalf("Error %v when deleting the job %v", err, klog.KObj(jobObj))
}

// re-create the job immediately
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, ptr.To(jobSpec(i)))
if err != nil {
t.Fatalf("Error %q while creating the job %q", err, klog.KObj(jobObj))
}
jobObjs[i] = jobObj
}

// total timeout (3*5s) is less than 2*ForeverTestTimeout.
for _, jobObj := range jobObjs {
// wait maks 5s for the Active=1. This assert verifies that the backoff
// delay is not applied to the replacement instance of the Job.
validateJobsPodsStatusOnlyWithTimeout(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
}, 5*time.Second)
}
}

// TestManagedBy_RecreatedJob verifies that the Job controller skips
// reconciliation of a job with managedBy field, when this is a recreated job,
// and there is still a pending sync queued for the previous job.
Expand Down Expand Up @@ -3965,6 +4062,29 @@ func TestSuspendJob(t *testing.T) {
}
}

// TestSuspendJobWithZeroCompletions verifies the suspended Job with
// completions=0 is marked as Complete.
func TestSuspendJobWithZeroCompletions(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "suspended-with-zero-completions")
t.Cleanup(closeFn)
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
t.Cleanup(func() {
cancel()
})
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: ptr.To[int32](0),
Suspend: ptr.To(true),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
for _, condition := range []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete} {
validateJobCondition(ctx, t, clientSet, jobObj, condition)
}
}

func TestSuspendJobControllerRestart(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
t.Cleanup(closeFn)
Expand Down