Skip to content
This repository was archived by the owner on Jan 28, 2022. It is now read-only.

Commit 09fd8ec

Browse files
author
Amr Asem
committed
fix: hashing job spec and storing in annotation
1 parent 2a44119 commit 09fd8ec

File tree

4 files changed

+52
-76
lines changed

4 files changed

+52
-76
lines changed

controllers/djob_controller.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,12 @@ func (r *DjobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
9696
r.Recorder.Event(instance, corev1.EventTypeWarning, "Resetting object", fmt.Sprintf("Failed to reset object: %s", err))
9797
return ctrl.Result{}, fmt.Errorf("error when resetting job: %v", err)
9898
}
99+
if err := r.UpdateHash(instance); err != nil {
100+
return ctrl.Result{}, fmt.Errorf("Failed to update the hash key: %v", err)
101+
}
99102
r.Recorder.Event(instance, corev1.EventTypeNormal, "Reset", "Object is reset")
100103
} else {
101-
r.Log.Info(fmt.Sprintf("Refreshesss for %v", req.NamespacedName))
104+
r.Log.Info(fmt.Sprintf("Refresh for %v", req.NamespacedName))
102105
if err := r.refresh(instance); err != nil {
103106
r.Recorder.Event(instance, corev1.EventTypeWarning, "Refreshing object", fmt.Sprintf("Failed to refresh object: %s", err))
104107
return ctrl.Result{}, fmt.Errorf("error when refreshing job: %v", err)

controllers/djob_controller_databricks.go

Lines changed: 45 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@ package controllers
1818

1919
import (
2020
"context"
21-
"encoding/json"
2221
"fmt"
23-
"strings"
24-
2522
databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1"
23+
"github.com/mitchellh/hashstructure"
2624
dbmodels "github.com/xinsnake/databricks-sdk-golang/azure/models"
2725
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2826
"k8s.io/apimachinery/pkg/types"
2927
"reflect"
3028
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"strconv"
30+
"strings"
3131
)
3232

3333
func (r *DjobReconciler) submit(instance *databricksv1alpha1.Djob) error {
@@ -71,6 +71,13 @@ func (r *DjobReconciler) submit(instance *databricksv1alpha1.Djob) error {
7171
}
7272
instance.ObjectMeta.SetOwnerReferences(references)
7373
}
74+
75+
if hash, err := hashstructure.Hash(instance.Spec, nil); err == nil {
76+
instance.ObjectMeta.SetAnnotations(map[string]string{instance.GetName(): strconv.FormatUint(hash, 10)})
77+
} else {
78+
r.Log.Info(fmt.Sprintf("Failed to hash the Spec for job %s", instance.GetName()))
79+
}
80+
7481
jobSettings := databricksv1alpha1.ToDatabricksJobSettings(instance.Spec)
7582
job, err := r.createJob(jobSettings)
7683

@@ -124,86 +131,36 @@ func (r *DjobReconciler) refresh(instance *databricksv1alpha1.Djob) error {
124131
}
125132

126133
/*
127-
IsDJobUpdated is a method to check if the cluster has the latest version of a certain Djob
134+
IsDJobUpdated checks if the cluster has the latest version of a certain Djob
128135
*/
129136
func (r *DjobReconciler) IsDJobUpdated(instance *databricksv1alpha1.Djob) bool {
130-
// func (r *DjobReconciler) checkIdentity(oldDJob, newDJob *[]byte) bool {
131-
jobID := instance.Status.JobStatus.JobID
132-
jobExisting, err := r.APIClient.Jobs().Get(jobID)
133-
if err != nil {
134-
if strings.Contains(err.Error(), "does not exist") {
135-
return true
136-
}
137-
return true
138-
}
139-
newDJob := []interface{}{}
140-
ExistingDJob := []interface{}{}
141137

142-
if instance.Spec.NotebookTask != nil {
143-
v, _ := json.Marshal(instance.Spec.NotebookTask)
144-
newDJob = append(newDJob, v)
145-
}
146-
if instance.Spec.SparkJarTask != nil {
147-
v, _ := json.Marshal(instance.Spec.SparkJarTask)
148-
newDJob = append(newDJob, v)
138+
currentAnnotation := instance.ObjectMeta.GetAnnotations()[instance.GetName()]
139+
var updatedHash uint64
140+
if returnUpdatedHash, err := hashstructure.Hash(instance.Spec, nil); err != nil {
141+
r.Log.Info(fmt.Sprintf("Deleting job %s", instance.GetName()))
142+
} else {
143+
updatedHash = returnUpdatedHash
149144
}
150145

151-
if instance.Spec.SparkPythonTask != nil {
152-
v, _ := json.Marshal(instance.Spec.SparkPythonTask)
153-
newDJob = append(newDJob, v)
154-
}
146+
// jobID := instance.Status.JobStatus.JobID
147+
// jobExisting, err := r.APIClient.Jobs().Get(jobID)
148+
// if err != nil {
149+
// if strings.Contains(err.Error(), "does not exist") {
150+
// return true
151+
// }
152+
// return true
153+
// }
155154

156-
if instance.Spec.SparkSubmitTask != nil {
157-
v, _ := json.Marshal(instance.Spec.SparkSubmitTask)
158-
newDJob = append(newDJob, v)
159-
}
155+
// hash2, err2 := hashstructure.Hash(jobExisting.Settings, nil)
156+
// if err2 != nil {
157+
// panic(err2)
158+
// }
159+
// r.Log.Info(fmt.Sprintf("Hashs %v", hash2))
160160

161-
if instance.Spec.NewCluster != nil {
162-
v, _ := json.Marshal(instance.Spec.NewCluster)
163-
newDJob = append(newDJob, v)
164-
}
161+
// r.Log.Info(fmt.Sprintf("2 object old %v and new %v", jobExisting.Settings, instance.Spec))
165162

166-
if instance.Spec.Schedule != nil {
167-
v, _ := json.Marshal(instance.Spec.Schedule)
168-
newDJob = append(newDJob, v)
169-
}
170-
newDJob = append(newDJob, instance.Spec.TimeoutSeconds)
171-
newDJob = append(newDJob, instance.Spec.MaxRetries)
172-
173-
////////////////////////////////////////////////
174-
175-
if jobExisting.Settings.NotebookTask != nil {
176-
v, _ := json.Marshal(jobExisting.Settings.NotebookTask)
177-
ExistingDJob = append(ExistingDJob, v)
178-
}
179-
if jobExisting.Settings.SparkJarTask != nil {
180-
v, _ := json.Marshal(jobExisting.Settings.SparkJarTask)
181-
ExistingDJob = append(ExistingDJob, v)
182-
}
183-
184-
if jobExisting.Settings.SparkPythonTask != nil {
185-
v, _ := json.Marshal(jobExisting.Settings.SparkPythonTask)
186-
ExistingDJob = append(ExistingDJob, v)
187-
}
188-
189-
if jobExisting.Settings.SparkSubmitTask != nil {
190-
v, _ := json.Marshal(jobExisting.Settings.SparkSubmitTask)
191-
ExistingDJob = append(ExistingDJob, v)
192-
}
193-
194-
if jobExisting.Settings.NewCluster != nil {
195-
v, _ := json.Marshal(jobExisting.Settings.NewCluster)
196-
ExistingDJob = append(ExistingDJob, v)
197-
}
198-
199-
if jobExisting.Settings.Schedule != nil {
200-
v, _ := json.Marshal(jobExisting.Settings.Schedule)
201-
ExistingDJob = append(ExistingDJob, v)
202-
}
203-
204-
ExistingDJob = append(ExistingDJob, jobExisting.Settings.TimeoutSeconds)
205-
ExistingDJob = append(ExistingDJob, jobExisting.Settings.MaxRetries)
206-
return reflect.DeepEqual(ExistingDJob, newDJob)
163+
return currentAnnotation == strconv.FormatUint(updatedHash, 10)
207164
}
208165

209166
func (r *DjobReconciler) delete(instance *databricksv1alpha1.Djob) error {
@@ -243,6 +200,19 @@ func (r *DjobReconciler) createJob(jobSettings dbmodels.JobSettings) (job dbmode
243200
return job, err
244201
}
245202

203+
// UpdateHash updates the current job with a new annotation key
204+
func (r *DjobReconciler) UpdateHash(instance *databricksv1alpha1.Djob) error {
205+
hash, err := hashstructure.Hash(instance.Spec, nil)
206+
if err != nil {
207+
return err
208+
}
209+
210+
delete(instance.GetAnnotations(), instance.GetName())
211+
instance.ObjectMeta.SetAnnotations(map[string]string{instance.GetName(): strconv.FormatUint(hash, 10)})
212+
213+
return r.Update(context.Background(), instance)
214+
}
215+
246216
func (r *DjobReconciler) reset(instance *databricksv1alpha1.Djob) error {
247217
r.Log.Info(fmt.Sprintf("Reset job %s", instance.GetName()))
248218
jobSettings := databricksv1alpha1.ToDatabricksJobSettings(instance.Spec)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/go-logr/logr v0.1.0
88
github.com/jstemmer/go-junit-report v0.9.1 // indirect
99
github.com/matm/gocov-html v0.0.0-20191111163307-9ee104d84c82 // indirect
10+
github.com/mitchellh/hashstructure v1.0.0
1011
github.com/onsi/ginkgo v1.10.3
1112
github.com/onsi/gomega v1.7.0
1213
github.com/prometheus/client_golang v0.9.2

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
196196
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
197197
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
198198
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
199+
github.com/mitchellh/hashstructure v1.0.0 h1:ZkRJX1CyOoTkar7p/mLS5TZU4nJ1Rn/F8u9dGS02Q3Y=
200+
github.com/mitchellh/hashstructure v1.0.0/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ=
199201
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
200202
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
201203
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=

0 commit comments

Comments
 (0)