Skip to content
This repository was archived by the owner on Jan 28, 2022. It is now read-only.

Commit c358c49

Browse files
Adding support for using existing_cluster_name as well as existing_cluster_id #86
1 parent c28d567 commit c358c49

14 files changed

+241
-21
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@
1010

1111
Kubernetes offers the facility of extending its API through the concept of 'Operators' ([Introducing Operators: Putting Operational Knowledge into Software](https://coreos.com/blog/introducing-operators.html)). This repository contains the resources and code to deploy an Azure Databricks Operator for Kubernetes.
1212

13+
![alt text](docs/images/azure-databricks-operator-highlevel.jpg "high level architecture")
1314

14-
![alt text](docs/images/azure-databricks-operator.jpg "high level architecture")
1515

1616
The Databricks operator is useful in situations where Kubernetes hosted applications wish to launch and use Databricks data engineering and machine learning tasks.
1717

18+
![alt text](docs/images/azure-databricks-operator.jpg "high level architecture")
19+
20+
1821
The project was built using
1922

2023
1. [Kubebuilder](https://book.kubebuilder.io/)

api/v1alpha1/dcluster_types.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ type DclusterStatus struct {
3232
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
3333
// +kubebuilder:printcolumn:name="ClusterID",type="string",JSONPath=".status.cluster_info.cluster_id"
3434
// +kubebuilder:printcolumn:name="State",type="string",JSONPath=".status.cluster_info.state"
35-
// +kubebuilder:printcolumn:name="NumWorkers",type="integer",JSONPath=".status.cluster_info.num_workers"
3635
type Dcluster struct {
3736
metav1.TypeMeta `json:",inline"`
3837
metav1.ObjectMeta `json:"metadata,omitempty"`

api/v1alpha1/djob_types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ type Djob struct {
3636
metav1.TypeMeta `json:",inline"`
3737
metav1.ObjectMeta `json:"metadata,omitempty"`
3838

39-
Spec *dbmodels.JobSettings `json:"spec,omitempty"`
40-
Status *DjobStatus `json:"status,omitempty"`
39+
Spec *JobSettings `json:"spec,omitempty"`
40+
Status *DjobStatus `json:"status,omitempty"`
4141
}
4242

4343
// IsBeingDeleted returns true if a deletion timestamp is set

api/v1alpha1/djob_types_extra.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
Copyright 2019 microsoft.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package v1alpha1
18+
19+
import (
20+
dbmodels "github.com/xinsnake/databricks-sdk-golang/azure/models"
21+
)
22+
23+
// JobSettings is similar to dbmodels.JobSettings, the reason it
24+
// exists is because dbmodels.JobSettings doesn't support ExistingClusterName
25+
// ExistingClusterName allows discovering databricks clusters by it's kubernetese object name
26+
type JobSettings struct {
27+
ExistingClusterID string `json:"existing_cluster_id,omitempty" url:"existing_cluster_id,omitempty"`
28+
ExistingClusterName string `json:"existing_cluster_name,omitempty" url:"existing_cluster_name,omitempty"`
29+
NewCluster *dbmodels.NewCluster `json:"new_cluster,omitempty" url:"new_cluster,omitempty"`
30+
NotebookTask *dbmodels.NotebookTask `json:"notebook_task,omitempty" url:"notebook_task,omitempty"`
31+
SparkJarTask *dbmodels.SparkJarTask `json:"spark_jar_task,omitempty" url:"spark_jar_task,omitempty"`
32+
SparkPythonTask *dbmodels.SparkPythonTask `json:"spark_python_task,omitempty" url:"spark_python_task,omitempty"`
33+
SparkSubmitTask *dbmodels.SparkSubmitTask `json:"spark_submit_task,omitempty" url:"spark_submit_task,omitempty"`
34+
Name string `json:"name,omitempty" url:"name,omitempty"`
35+
Libraries []dbmodels.Library `json:"libraries,omitempty" url:"libraries,omitempty"`
36+
EmailNotifications *dbmodels.JobEmailNotifications `json:"email_notifications,omitempty" url:"email_notifications,omitempty"`
37+
TimeoutSeconds int32 `json:"timeout_seconds,omitempty" url:"timeout_seconds,omitempty"`
38+
MaxRetries int32 `json:"max_retries,omitempty" url:"max_retries,omitempty"`
39+
MinRetryIntervalMillis int32 `json:"min_retry_interval_millis,omitempty" url:"min_retry_interval_millis,omitempty"`
40+
RetryOnTimeout bool `json:"retry_on_timeout,omitempty" url:"retry_on_timeout,omitempty"`
41+
Schedule *dbmodels.CronSchedule `json:"schedule,omitempty" url:"schedule,omitempty"`
42+
MaxConcurrentRuns int32 `json:"max_concurrent_runs,omitempty" url:"max_concurrent_runs,omitempty"`
43+
}
44+
// ToK8sJobSettings converts a databricks JobSettings object to k8s JobSettings object.
45+
// It is needed to add ExistingClusterName and follow k8s camleCase naming convention
46+
func ToK8sJobSettings(dbjs *dbmodels.JobSettings) JobSettings {
47+
var k8sjs JobSettings
48+
k8sjs.ExistingClusterID = dbjs.ExistingClusterID
49+
k8sjs.NewCluster = dbjs.NewCluster
50+
k8sjs.NotebookTask = dbjs.NotebookTask
51+
k8sjs.SparkJarTask = dbjs.SparkJarTask
52+
k8sjs.SparkPythonTask = dbjs.SparkPythonTask
53+
k8sjs.SparkSubmitTask = dbjs.SparkSubmitTask
54+
k8sjs.Name = dbjs.Name
55+
k8sjs.Libraries = dbjs.Libraries
56+
k8sjs.EmailNotifications = dbjs.EmailNotifications
57+
k8sjs.TimeoutSeconds = dbjs.TimeoutSeconds
58+
k8sjs.MaxRetries = dbjs.MaxRetries
59+
k8sjs.MinRetryIntervalMillis = dbjs.MinRetryIntervalMillis
60+
k8sjs.RetryOnTimeout = dbjs.RetryOnTimeout
61+
k8sjs.Schedule = dbjs.Schedule
62+
k8sjs.MaxConcurrentRuns = dbjs.MaxConcurrentRuns
63+
return k8sjs
64+
}
65+
66+
// ToDatabricksJobSettings converts a k8s JobSettings object to a DataBricks JobSettings object.
67+
// It is needed to add ExistingClusterName and follow k8s camleCase naming convention
68+
func ToDatabricksJobSettings(k8sjs *JobSettings) dbmodels.JobSettings {
69+
70+
var dbjs dbmodels.JobSettings
71+
dbjs.ExistingClusterID = k8sjs.ExistingClusterID
72+
dbjs.NewCluster = k8sjs.NewCluster
73+
dbjs.NotebookTask = k8sjs.NotebookTask
74+
dbjs.SparkJarTask = k8sjs.SparkJarTask
75+
dbjs.SparkPythonTask = k8sjs.SparkPythonTask
76+
dbjs.SparkSubmitTask = k8sjs.SparkSubmitTask
77+
dbjs.Name = k8sjs.Name
78+
dbjs.Libraries = k8sjs.Libraries
79+
dbjs.EmailNotifications = k8sjs.EmailNotifications
80+
dbjs.TimeoutSeconds = k8sjs.TimeoutSeconds
81+
dbjs.MaxRetries = k8sjs.MaxRetries
82+
dbjs.MinRetryIntervalMillis = k8sjs.MinRetryIntervalMillis
83+
dbjs.RetryOnTimeout = k8sjs.RetryOnTimeout
84+
dbjs.Schedule = k8sjs.Schedule
85+
dbjs.MaxConcurrentRuns = k8sjs.MaxConcurrentRuns
86+
return dbjs
87+
}

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ func (in *Djob) DeepCopyInto(out *Djob) {
298298
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
299299
if in.Spec != nil {
300300
in, out := &in.Spec, &out.Spec
301-
*out = new(models.JobSettings)
301+
*out = new(JobSettings)
302302
(*in).DeepCopyInto(*out)
303303
}
304304
if in.Status != nil {
@@ -385,6 +385,63 @@ func (in *DjobStatus) DeepCopy() *DjobStatus {
385385
return out
386386
}
387387

388+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
389+
func (in *JobSettings) DeepCopyInto(out *JobSettings) {
390+
*out = *in
391+
if in.NewCluster != nil {
392+
in, out := &in.NewCluster, &out.NewCluster
393+
*out = new(models.NewCluster)
394+
(*in).DeepCopyInto(*out)
395+
}
396+
if in.NotebookTask != nil {
397+
in, out := &in.NotebookTask, &out.NotebookTask
398+
*out = new(models.NotebookTask)
399+
(*in).DeepCopyInto(*out)
400+
}
401+
if in.SparkJarTask != nil {
402+
in, out := &in.SparkJarTask, &out.SparkJarTask
403+
*out = new(models.SparkJarTask)
404+
(*in).DeepCopyInto(*out)
405+
}
406+
if in.SparkPythonTask != nil {
407+
in, out := &in.SparkPythonTask, &out.SparkPythonTask
408+
*out = new(models.SparkPythonTask)
409+
(*in).DeepCopyInto(*out)
410+
}
411+
if in.SparkSubmitTask != nil {
412+
in, out := &in.SparkSubmitTask, &out.SparkSubmitTask
413+
*out = new(models.SparkSubmitTask)
414+
(*in).DeepCopyInto(*out)
415+
}
416+
if in.Libraries != nil {
417+
in, out := &in.Libraries, &out.Libraries
418+
*out = make([]models.Library, len(*in))
419+
for i := range *in {
420+
(*in)[i].DeepCopyInto(&(*out)[i])
421+
}
422+
}
423+
if in.EmailNotifications != nil {
424+
in, out := &in.EmailNotifications, &out.EmailNotifications
425+
*out = new(models.JobEmailNotifications)
426+
(*in).DeepCopyInto(*out)
427+
}
428+
if in.Schedule != nil {
429+
in, out := &in.Schedule, &out.Schedule
430+
*out = new(models.CronSchedule)
431+
**out = **in
432+
}
433+
}
434+
435+
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobSettings.
436+
func (in *JobSettings) DeepCopy() *JobSettings {
437+
if in == nil {
438+
return nil
439+
}
440+
out := new(JobSettings)
441+
in.DeepCopyInto(out)
442+
return out
443+
}
444+
388445
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
389446
func (in *Run) DeepCopyInto(out *Run) {
390447
*out = *in

config/crd/bases/databricks.microsoft.com_dclusters.yaml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ spec:
1616
- JSONPath: .status.cluster_info.state
1717
name: State
1818
type: string
19-
- JSONPath: .status.cluster_info.num_workers
20-
name: NumWorkers
21-
type: integer
2219
group: databricks.microsoft.com
2320
names:
2421
kind: Dcluster

config/crd/bases/databricks.microsoft.com_djobs.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ spec:
3636
metadata:
3737
type: object
3838
spec:
39+
description: JobSettings is similar to dbmodels.JobSettings, the reason
40+
it exists is because dbmodels.JobSettings doesn't support ExistingClusterName
41+
ExistingClusterName allows discovering databricks clusters by it's kubernetese
42+
object name
3943
properties:
4044
email_notifications:
4145
properties:
@@ -56,6 +60,8 @@ spec:
5660
type: object
5761
existing_cluster_id:
5862
type: string
63+
existing_cluster_name:
64+
type: string
5965
libraries:
6066
items:
6167
properties:

config/samples/databricks_v1alpha1_dcluster.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ spec:
77
node_type_id: Standard_D3_v2
88
autoscale:
99
min_workers: 2
10-
max_workers: 5
10+
max_workers: 3

controllers/dcluster_controller.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import (
2222
"time"
2323

2424
"github.com/go-logr/logr"
25+
databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1"
2526
dbazure "github.com/xinsnake/databricks-sdk-golang/azure"
2627
corev1 "k8s.io/api/core/v1"
2728
"k8s.io/apimachinery/pkg/api/errors"
29+
"k8s.io/apimachinery/pkg/runtime"
2830
"k8s.io/client-go/tools/record"
2931
ctrl "sigs.k8s.io/controller-runtime"
3032
"sigs.k8s.io/controller-runtime/pkg/client"
31-
32-
databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1"
3333
)
3434

3535
// DclusterReconciler reconciles a Dcluster object
@@ -103,8 +103,25 @@ func (r *DclusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
103103
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
104104
}
105105

106+
const dclusterIndexKey = ".status.cluster_info.cluster_id"
107+
108+
106109
// SetupWithManager adds the controller manager
107110
func (r *DclusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
111+
112+
if err := mgr.GetFieldIndexer().IndexField(&databricksv1alpha1.Dcluster{}, dclusterIndexKey, func(rawObj runtime.Object) []string {
113+
dcluster := rawObj.(*databricksv1alpha1.Dcluster)
114+
if dcluster == nil {
115+
return nil
116+
}
117+
if dcluster.Status == nil || dcluster.Status.ClusterInfo == nil {
118+
return nil
119+
}
120+
return []string{dcluster.Status.ClusterInfo.ClusterID}
121+
}); err != nil {
122+
return err
123+
}
124+
108125
return ctrl.NewControllerManagedBy(mgr).
109126
For(&databricksv1alpha1.Dcluster{}).
110127
Complete(r)

controllers/djob_controller_databricks.go

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,60 @@ package controllers
1919
import (
2020
"context"
2121
"fmt"
22-
"reflect"
23-
"strings"
24-
2522
databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2624
"k8s.io/apimachinery/pkg/types"
25+
"reflect"
26+
"sigs.k8s.io/controller-runtime/pkg/client"
27+
"strings"
2728
)
2829

2930
func (r *DjobReconciler) submit(instance *databricksv1alpha1.Djob) error {
3031
r.Log.Info(fmt.Sprintf("Submitting job %s", instance.GetName()))
31-
3232
instance.Spec.Name = instance.GetName()
33-
34-
job, err := r.APIClient.Jobs().Create(*instance.Spec)
33+
//Get exisiting dbricks cluster by cluster name and set ExistingClusterID or
34+
//Get exisiting dbricks cluster by cluster id
35+
var ownerInstance databricksv1alpha1.Dcluster
36+
if len(instance.Spec.ExistingClusterName) > 0 {
37+
dClusterNamespacedName := types.NamespacedName{Name: instance.Spec.ExistingClusterName, Namespace: instance.Namespace}
38+
err := r.Get(context.Background(), dClusterNamespacedName, &ownerInstance)
39+
if err != nil {
40+
return err
41+
}
42+
if (ownerInstance.Status != nil) && (ownerInstance.Status.ClusterInfo != nil) && len(ownerInstance.Status.ClusterInfo.ClusterID) > 0 {
43+
instance.Spec.ExistingClusterID = ownerInstance.Status.ClusterInfo.ClusterID
44+
} else {
45+
return fmt.Errorf("failed to get ClusterID of %v", instance.Spec.ExistingClusterName)
46+
}
47+
} else if len(instance.Spec.ExistingClusterID) > 0 {
48+
var dclusters databricksv1alpha1.DclusterList
49+
err := r.List(context.Background(), &dclusters, client.InNamespace(instance.Namespace), client.MatchingField(dclusterIndexKey, instance.Spec.ExistingClusterID))
50+
if err != nil {
51+
return err
52+
}
53+
if len(dclusters.Items) == 1 {
54+
ownerInstance = dclusters.Items[0]
55+
} else {
56+
return fmt.Errorf("failed to get ClusterID of %v", instance.Spec.ExistingClusterID)
57+
}
58+
}
59+
//Set Exisiting cluster as Owner of JOb
60+
if &ownerInstance != nil {
61+
references := []metav1.OwnerReference{
62+
{
63+
APIVersion: ownerInstance.APIVersion,
64+
Kind: ownerInstance.Kind ,
65+
Name: ownerInstance.GetName(),
66+
UID: ownerInstance.GetUID(),
67+
},
68+
}
69+
instance.ObjectMeta.SetOwnerReferences(references)
70+
}
71+
jobSettings := databricksv1alpha1.ToDatabricksJobSettings(instance.Spec)
72+
job, err := r.APIClient.Jobs().Create(jobSettings)
3573
if err != nil {
3674
return err
3775
}
38-
3976
instance.Spec.Name = instance.GetName()
4077
instance.Status = &databricksv1alpha1.DjobStatus{
4178
JobStatus: &job,

0 commit comments

Comments
 (0)