Skip to content

Commit

Permalink
[BUG-5656] Annotate Jobs with parent ScaledJob generation (kedacore#5876
Browse files Browse the repository at this point in the history
)

* Annotate Jobs with parent ScaledJob generation

Signed-off-by: Josef Karasek <[email protected]>

* fix tests

Signed-off-by: Josef Karasek <[email protected]>

* fix lint

Signed-off-by: Josef Karasek <[email protected]>

* fix log message

Signed-off-by: Josef Karasek <[email protected]>

* update changelog

Signed-off-by: Josef Karasek <[email protected]>

* update changelog

Signed-off-by: Josef Karasek <[email protected]>

* update changelog

Signed-off-by: Josef Karasek <[email protected]>

---------

Signed-off-by: Josef Karasek <[email protected]>
Signed-off-by: Zbynek Roubalik <[email protected]>
Co-authored-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
2 people authored and JorTurFer committed Jul 30, 2024
1 parent e7ad9c3 commit d8dac49
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 15 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ Here is an overview of all new **experimental** features:
### Improvements

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Added `eagerScalingStrategy` for `ScaledJob` ([#5114](https://github.com/kedacore/keda/issues/5114))
- **General**: Do not delete running Jobs on KEDA restart ([#5656](https://github.com/kedacore/keda/issues/5656))
- **Azure queue scaler**: Added new configuration option 'queueLengthStrategy' ([#4478](https://github.com/kedacore/keda/issues/4478))
- **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802))
- **GCP Pub/Sub**: Add optional valueIfNull to allow a default scaling value and prevent errors when GCP metric returns no value. ([#5896](https://github.com/kedacore/keda/issues/5896))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **IBM MQ Scaler**: Add TLS support for IBM MQ scaler ([#5974](https://github.com/kedacore/keda/issues/5974))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **MYSQL Scaler**: Add support to fetch username from env ([#5883](https://github.com/kedacore/keda/issues/5883))
- **Postgres Scaler**: Add support for access token authentication to an Azure Postgres Flexible Server ([#5823](https://github.com/kedacore/keda/issues/5823))

### Fixes

Expand Down
38 changes: 26 additions & 12 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,36 @@ func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context
return "Cannot get list of Jobs owned by this scaledJob", err
}

if len(jobs.Items) > 0 {
logger.Info("RolloutStrategy: immediate, Deleting jobs owned by the previous version of the scaledJob", "numJobsToDelete", len(jobs.Items))
jobIndexes := make([]int, 0, len(jobs.Items))
scaledJobGeneration := strconv.FormatInt(scaledJob.Generation, 10)
for i, job := range jobs.Items {
if jobGen, ok := job.Annotations["scaledjob.keda.sh/generation"]; !ok {
// delete Jobs that don't have the generation annotation
jobIndexes = append(jobIndexes, i)
} else if jobGen != scaledJobGeneration {
// delete Jobs that have a different generation annotation
jobIndexes = append(jobIndexes, i)
}
}
for _, job := range jobs.Items {
job := job

propagationPolicy := metav1.DeletePropagationBackground
if scaledJob.Spec.Rollout.PropagationPolicy == "foreground" {
propagationPolicy = metav1.DeletePropagationForeground
}
err = r.Client.Delete(ctx, &job, client.PropagationPolicy(propagationPolicy))
if err != nil {
return "Not able to delete job: " + job.Name, err
if len(jobIndexes) == 0 {
logger.Info("RolloutStrategy: immediate, No jobs owned by the previous version of the scaledJob")
} else {
logger.Info("RolloutStrategy: immediate, Deleting jobs owned by the previous version of the scaledJob", "numJobsToDelete", len(jobIndexes))
for _, index := range jobIndexes {
job := jobs.Items[index]

propagationPolicy := metav1.DeletePropagationBackground
if scaledJob.Spec.Rollout.PropagationPolicy == "foreground" {
propagationPolicy = metav1.DeletePropagationForeground
}
err = r.Client.Delete(ctx, &job, client.PropagationPolicy(propagationPolicy))
if err != nil {
return "Not able to delete job: " + job.Name, err
}
}
return fmt.Sprintf("RolloutStrategy: immediate, deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", len(jobIndexes)), nil
}
return fmt.Sprintf("RolloutStrategy: immediate, deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", len(jobs.Items)), nil
}
return fmt.Sprintf("RolloutStrategy: %s", scaledJob.Spec.RolloutStrategy), nil
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (e *scaleExecutor) getScalingDecision(scaledJob *kedav1alpha1.ScaledJob, ru
}

func (e *scaleExecutor) createJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) {
if maxScale <= 0 {
logger.Info("No need to create jobs - all requested jobs already exist", "jobs", maxScale)
return
}
logger.Info("Creating jobs", "Effective number of max jobs", maxScale)
if scaleTo > maxScale {
scaleTo = maxScale
Expand Down Expand Up @@ -150,14 +154,21 @@ func (e *scaleExecutor) generateJobs(logger logr.Logger, scaledJob *kedav1alpha1
labels[key] = value
}

annotations := map[string]string{
"scaledjob.keda.sh/generation": strconv.FormatInt(scaledJob.Generation, 10),
}
for key, value := range scaledJob.ObjectMeta.Annotations {
annotations[key] = value
}

jobs := make([]*batchv1.Job, int(scaleTo))
for i := 0; i < int(scaleTo); i++ {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: scaledJob.GetName() + "-",
Namespace: scaledJob.GetNamespace(),
Labels: labels,
Annotations: scaledJob.ObjectMeta.Annotations,
Annotations: annotations,
},
Spec: *scaledJob.Spec.JobTargetRef.DeepCopy(),
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,11 @@ func TestCreateJobs(t *testing.T) {

func TestGenerateJobs(t *testing.T) {
var (
expectedAnnotations = map[string]string{"test": "test"}
expectedLabels = map[string]string{
expectedAnnotations = map[string]string{
"test": "test",
"scaledjob.keda.sh/generation": "0",
}
expectedLabels = map[string]string{
"app.kubernetes.io/managed-by": "keda-operator",
"app.kubernetes.io/name": "test",
"app.kubernetes.io/part-of": "test",
Expand Down

0 comments on commit d8dac49

Please sign in to comment.