diff --git a/README.md b/README.md index 91561af..6af7398 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,14 @@ Kubernetes offers the facility of extending its API through the concept of 'Operators' ([Introducing Operators: Putting Operational Knowledge into Software](https://coreos.com/blog/introducing-operators.html)). This repository contains the resources and code to deploy an Azure Databricks Operator for Kubernetes. +![alt text](docs/images/azure-databricks-operator-highlevel.jpg "high level architecture") -![alt text](docs/images/azure-databricks-operator.jpg "high level architecture") The Databricks operator is useful in situations where Kubernetes hosted applications wish to launch and use Databricks data engineering and machine learning tasks. +![alt text](docs/images/azure-databricks-operator.jpg "high level architecture") + + The project was built using 1. [Kubebuilder](https://book.kubebuilder.io/) diff --git a/api/v1alpha1/dbfsblock_types_test.go b/api/v1alpha1/dbfsblock_types_test.go index 97610df..2995606 100644 --- a/api/v1alpha1/dbfsblock_types_test.go +++ b/api/v1alpha1/dbfsblock_types_test.go @@ -54,13 +54,13 @@ var _ = Describe("DbfsBlock", func() { It("should create an object successfully", func() { key = types.NamespacedName{ - Name: "foo", + Name: "foo"+ RandomString(5), Namespace: "default", } created = &DbfsBlock{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "default", + Name: key.Name, + Namespace: key.Namespace, }} By("creating an API obj") diff --git a/api/v1alpha1/dcluster_types.go b/api/v1alpha1/dcluster_types.go index 49f9cab..e806e4d 100644 --- a/api/v1alpha1/dcluster_types.go +++ b/api/v1alpha1/dcluster_types.go @@ -32,7 +32,6 @@ type DclusterStatus struct { // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // +kubebuilder:printcolumn:name="ClusterID",type="string",JSONPath=".status.cluster_info.cluster_id" // +kubebuilder:printcolumn:name="State",type="string",JSONPath=".status.cluster_info.state" -// +kubebuilder:printcolumn:name="NumWorkers",type="integer",JSONPath=".status.cluster_info.num_workers" type Dcluster struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/api/v1alpha1/dcluster_types_test.go b/api/v1alpha1/dcluster_types_test.go index 5d878b5..c9fd1a6 100644 --- a/api/v1alpha1/dcluster_types_test.go +++ b/api/v1alpha1/dcluster_types_test.go @@ -54,13 +54,13 @@ var _ = Describe("Dcluster", func() { It("should create an object successfully", func() { key = types.NamespacedName{ - Name: "foo", + Name: "foo-" + RandomString(5), Namespace: "default", } created = &Dcluster{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "default", + Name: key.Name, + Namespace: key.Namespace, }} By("creating an API obj") diff --git a/api/v1alpha1/djob_types.go b/api/v1alpha1/djob_types.go index 45bedfb..29972b5 100644 --- a/api/v1alpha1/djob_types.go +++ b/api/v1alpha1/djob_types.go @@ -36,8 +36,8 @@ type Djob struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec *dbmodels.JobSettings `json:"spec,omitempty"` - Status *DjobStatus `json:"status,omitempty"` + Spec *JobSettings `json:"spec,omitempty"` + Status *DjobStatus `json:"status,omitempty"` } // IsBeingDeleted returns true if a deletion timestamp is set diff --git a/api/v1alpha1/djob_types_extra.go b/api/v1alpha1/djob_types_extra.go new file mode 100644 index 0000000..721d048 --- /dev/null +++ b/api/v1alpha1/djob_types_extra.go @@ -0,0 +1,88 @@ +/* +Copyright 2019 microsoft. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + dbmodels "github.com/xinsnake/databricks-sdk-golang/azure/models" +) + +// JobSettings is similar to dbmodels.JobSettings, the reason it +// exists is because dbmodels.JobSettings doesn't support ExistingClusterName +// ExistingClusterName allows discovering databricks clusters by it's kubernetese object name +type JobSettings struct { + ExistingClusterID string `json:"existing_cluster_id,omitempty" url:"existing_cluster_id,omitempty"` + ExistingClusterName string `json:"existing_cluster_name,omitempty" url:"existing_cluster_name,omitempty"` + NewCluster *dbmodels.NewCluster `json:"new_cluster,omitempty" url:"new_cluster,omitempty"` + NotebookTask *dbmodels.NotebookTask `json:"notebook_task,omitempty" url:"notebook_task,omitempty"` + SparkJarTask *dbmodels.SparkJarTask `json:"spark_jar_task,omitempty" url:"spark_jar_task,omitempty"` + SparkPythonTask *dbmodels.SparkPythonTask `json:"spark_python_task,omitempty" url:"spark_python_task,omitempty"` + SparkSubmitTask *dbmodels.SparkSubmitTask `json:"spark_submit_task,omitempty" url:"spark_submit_task,omitempty"` + Name string `json:"name,omitempty" url:"name,omitempty"` + Libraries []dbmodels.Library `json:"libraries,omitempty" url:"libraries,omitempty"` + EmailNotifications *dbmodels.JobEmailNotifications `json:"email_notifications,omitempty" url:"email_notifications,omitempty"` + TimeoutSeconds int32 `json:"timeout_seconds,omitempty" url:"timeout_seconds,omitempty"` + MaxRetries int32 `json:"max_retries,omitempty" url:"max_retries,omitempty"` + MinRetryIntervalMillis int32 `json:"min_retry_interval_millis,omitempty" url:"min_retry_interval_millis,omitempty"` + RetryOnTimeout bool `json:"retry_on_timeout,omitempty" url:"retry_on_timeout,omitempty"` + Schedule *dbmodels.CronSchedule `json:"schedule,omitempty" url:"schedule,omitempty"` + MaxConcurrentRuns int32 `json:"max_concurrent_runs,omitempty" url:"max_concurrent_runs,omitempty"` +} + +// ToK8sJobSettings converts a databricks JobSettings object to k8s JobSettings object. +// It is needed to add ExistingClusterName and follow k8s camleCase naming convention +func ToK8sJobSettings(dbjs *dbmodels.JobSettings) JobSettings { + var k8sjs JobSettings + k8sjs.ExistingClusterID = dbjs.ExistingClusterID + k8sjs.NewCluster = dbjs.NewCluster + k8sjs.NotebookTask = dbjs.NotebookTask + k8sjs.SparkJarTask = dbjs.SparkJarTask + k8sjs.SparkPythonTask = dbjs.SparkPythonTask + k8sjs.SparkSubmitTask = dbjs.SparkSubmitTask + k8sjs.Name = dbjs.Name + k8sjs.Libraries = dbjs.Libraries + k8sjs.EmailNotifications = dbjs.EmailNotifications + k8sjs.TimeoutSeconds = dbjs.TimeoutSeconds + k8sjs.MaxRetries = dbjs.MaxRetries + k8sjs.MinRetryIntervalMillis = dbjs.MinRetryIntervalMillis + k8sjs.RetryOnTimeout = dbjs.RetryOnTimeout + k8sjs.Schedule = dbjs.Schedule + k8sjs.MaxConcurrentRuns = dbjs.MaxConcurrentRuns + return k8sjs +} + +// ToDatabricksJobSettings converts a k8s JobSettings object to a DataBricks JobSettings object. +// It is needed to add ExistingClusterName and follow k8s camleCase naming convention +func ToDatabricksJobSettings(k8sjs *JobSettings) dbmodels.JobSettings { + + var dbjs dbmodels.JobSettings + dbjs.ExistingClusterID = k8sjs.ExistingClusterID + dbjs.NewCluster = k8sjs.NewCluster + dbjs.NotebookTask = k8sjs.NotebookTask + dbjs.SparkJarTask = k8sjs.SparkJarTask + dbjs.SparkPythonTask = k8sjs.SparkPythonTask + dbjs.SparkSubmitTask = k8sjs.SparkSubmitTask + dbjs.Name = k8sjs.Name + dbjs.Libraries = k8sjs.Libraries + dbjs.EmailNotifications = k8sjs.EmailNotifications + dbjs.TimeoutSeconds = k8sjs.TimeoutSeconds + dbjs.MaxRetries = k8sjs.MaxRetries + dbjs.MinRetryIntervalMillis = k8sjs.MinRetryIntervalMillis + dbjs.RetryOnTimeout = k8sjs.RetryOnTimeout + dbjs.Schedule = k8sjs.Schedule + dbjs.MaxConcurrentRuns = k8sjs.MaxConcurrentRuns + return dbjs +} diff --git a/api/v1alpha1/djob_types_test.go b/api/v1alpha1/djob_types_test.go index d95e5ab..701df87 100644 --- a/api/v1alpha1/djob_types_test.go +++ b/api/v1alpha1/djob_types_test.go @@ -54,15 +54,14 @@ var _ = Describe("Djob", func() { It("should create an object successfully", func() { key = types.NamespacedName{ - Name: "foo", + Name: "foo-" + RandomString(5), Namespace: "default", } created = &Djob{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "default", + Name: key.Name, + Namespace: key.Namespace, }} - By("creating an API obj") Expect(k8sClient.Create(context.Background(), created)).To(Succeed()) diff --git a/api/v1alpha1/run_types_test.go b/api/v1alpha1/run_types_test.go index a7a9268..0a4c24e 100644 --- a/api/v1alpha1/run_types_test.go +++ b/api/v1alpha1/run_types_test.go @@ -55,13 +55,13 @@ var _ = Describe("Run", func() { It("should create an object successfully", func() { key = types.NamespacedName{ - Name: "foo", + Name: "foo-" + RandomString(5), Namespace: "default", } created = &Run{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "default", + Name: key.Name, + Namespace: key.Namespace, }} By("creating an API obj") diff --git a/api/v1alpha1/secretscope_types_test.go b/api/v1alpha1/secretscope_types_test.go index 8528f8a..4a08fe8 100644 --- a/api/v1alpha1/secretscope_types_test.go +++ b/api/v1alpha1/secretscope_types_test.go @@ -53,13 +53,13 @@ var _ = Describe("SecretScope", func() { It("should create an object successfully", func() { key = types.NamespacedName{ - Name: "foo", + Name: "foo-" + RandomString(5), Namespace: "default", } created = &SecretScope{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "default", + Name: key.Name, + Namespace: key.Namespace, }} By("creating an API obj") diff --git a/api/v1alpha1/workspaceitem_types_test.go b/api/v1alpha1/workspaceitem_types_test.go index f305cff..fd4dd85 100644 --- a/api/v1alpha1/workspaceitem_types_test.go +++ b/api/v1alpha1/workspaceitem_types_test.go @@ -54,13 +54,13 @@ var _ = Describe("WorkspaceItem", func() { It("should create an object successfully", func() { key = types.NamespacedName{ - Name: "foo", + Name: "foo-" + RandomString(5), Namespace: "default", } created = &WorkspaceItem{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "default", + Name: key.Name, + Namespace: key.Namespace, }} By("creating an API obj") diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 7eb688e..ba63de5 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -298,7 +298,7 @@ func (in *Djob) DeepCopyInto(out *Djob) { in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) if in.Spec != nil { in, out := &in.Spec, &out.Spec - *out = new(models.JobSettings) + *out = new(JobSettings) (*in).DeepCopyInto(*out) } if in.Status != nil { @@ -385,6 +385,63 @@ func (in *DjobStatus) DeepCopy() *DjobStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JobSettings) DeepCopyInto(out *JobSettings) { + *out = *in + if in.NewCluster != nil { + in, out := &in.NewCluster, &out.NewCluster + *out = new(models.NewCluster) + (*in).DeepCopyInto(*out) + } + if in.NotebookTask != nil { + in, out := &in.NotebookTask, &out.NotebookTask + *out = new(models.NotebookTask) + (*in).DeepCopyInto(*out) + } + if in.SparkJarTask != nil { + in, out := &in.SparkJarTask, &out.SparkJarTask + *out = new(models.SparkJarTask) + (*in).DeepCopyInto(*out) + } + if in.SparkPythonTask != nil { + in, out := &in.SparkPythonTask, &out.SparkPythonTask + *out = new(models.SparkPythonTask) + (*in).DeepCopyInto(*out) + } + if in.SparkSubmitTask != nil { + in, out := &in.SparkSubmitTask, &out.SparkSubmitTask + *out = new(models.SparkSubmitTask) + (*in).DeepCopyInto(*out) + } + if in.Libraries != nil { + in, out := &in.Libraries, &out.Libraries + *out = make([]models.Library, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.EmailNotifications != nil { + in, out := &in.EmailNotifications, &out.EmailNotifications + *out = new(models.JobEmailNotifications) + (*in).DeepCopyInto(*out) + } + if in.Schedule != nil { + in, out := &in.Schedule, &out.Schedule + *out = new(models.CronSchedule) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobSettings. +func (in *JobSettings) DeepCopy() *JobSettings { + if in == nil { + return nil + } + out := new(JobSettings) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Run) DeepCopyInto(out *Run) { *out = *in diff --git a/config/crd/bases/databricks.microsoft.com_dclusters.yaml b/config/crd/bases/databricks.microsoft.com_dclusters.yaml index c2667f9..2bcd9d3 100644 --- a/config/crd/bases/databricks.microsoft.com_dclusters.yaml +++ b/config/crd/bases/databricks.microsoft.com_dclusters.yaml @@ -16,9 +16,6 @@ spec: - JSONPath: .status.cluster_info.state name: State type: string - - JSONPath: .status.cluster_info.num_workers - name: NumWorkers - type: integer group: databricks.microsoft.com names: kind: Dcluster diff --git a/config/crd/bases/databricks.microsoft.com_djobs.yaml b/config/crd/bases/databricks.microsoft.com_djobs.yaml index b5210e7..5c3df9f 100644 --- a/config/crd/bases/databricks.microsoft.com_djobs.yaml +++ b/config/crd/bases/databricks.microsoft.com_djobs.yaml @@ -36,6 +36,10 @@ spec: metadata: type: object spec: + description: JobSettings is similar to dbmodels.JobSettings, the reason + it exists is because dbmodels.JobSettings doesn't support ExistingClusterName + ExistingClusterName allows discovering databricks clusters by it's kubernetese + object name properties: email_notifications: properties: @@ -56,6 +60,8 @@ spec: type: object existing_cluster_id: type: string + existing_cluster_name: + type: string libraries: items: properties: diff --git a/config/samples/databricks_v1alpha1_dcluster.yaml b/config/samples/databricks_v1alpha1_dcluster.yaml index 6df2e8c..14061d0 100644 --- a/config/samples/databricks_v1alpha1_dcluster.yaml +++ b/config/samples/databricks_v1alpha1_dcluster.yaml @@ -7,4 +7,4 @@ spec: node_type_id: Standard_D3_v2 autoscale: min_workers: 2 - max_workers: 5 + max_workers: 3 diff --git a/controllers/dbfsblock_controller_test.go b/controllers/dbfsblock_controller_test.go index 60a7c45..d6b8fa2 100644 --- a/controllers/dbfsblock_controller_test.go +++ b/controllers/dbfsblock_controller_test.go @@ -58,7 +58,7 @@ var _ = Describe("DbfsBlock Controller", func() { dataStr2 := base64.StdEncoding.EncodeToString(data2) key := types.NamespacedName{ - Name: "block-greater-than-1mb", + Name: "t-block-greater-than-1mb" + randomStringWithCharset(10, charset), Namespace: "default", } diff --git a/controllers/dcluster_controller.go b/controllers/dcluster_controller.go index 05458ab..2206140 100644 --- a/controllers/dcluster_controller.go +++ b/controllers/dcluster_controller.go @@ -22,14 +22,14 @@ import ( "time" "github.com/go-logr/logr" + databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1" dbazure "github.com/xinsnake/databricks-sdk-golang/azure" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - - databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1" ) // DclusterReconciler reconciles a Dcluster object @@ -103,8 +103,19 @@ func (r *DclusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{RequeueAfter: 30 * time.Second}, nil } +const dclusterIndexKey = ".status.cluster_info.cluster_id" + // SetupWithManager adds the controller manager func (r *DclusterReconciler) SetupWithManager(mgr ctrl.Manager) error { + if err := mgr.GetFieldIndexer().IndexField(&databricksv1alpha1.Dcluster{}, dclusterIndexKey, func(rawObj runtime.Object) []string { + dcluster := rawObj.(*databricksv1alpha1.Dcluster) + if dcluster == nil || dcluster.Status == nil || dcluster.Status.ClusterInfo == nil { + return nil + } + return []string{dcluster.Status.ClusterInfo.ClusterID} + }); err != nil { + return err + } return ctrl.NewControllerManagedBy(mgr). For(&databricksv1alpha1.Dcluster{}). Complete(r) diff --git a/controllers/dcluster_controller_test.go b/controllers/dcluster_controller_test.go index 4921b07..c33f4fa 100644 --- a/controllers/dcluster_controller_test.go +++ b/controllers/dcluster_controller_test.go @@ -28,7 +28,7 @@ import ( "k8s.io/apimachinery/pkg/types" ) -var _ = Describe("DbfsBlock Controller", func() { +var _ = Describe("Dcluster Controller", func() { const timeout = time.Second * 30 const interval = time.Second * 1 @@ -49,7 +49,7 @@ var _ = Describe("DbfsBlock Controller", func() { It("Should create successfully", func() { key := types.NamespacedName{ - Name: "test-cluster", + Name: "t-cluster" + "-" + randomStringWithCharset(10, charset), Namespace: "default", } @@ -63,7 +63,7 @@ var _ = Describe("DbfsBlock Controller", func() { MinWorkers: 2, MaxWorkers: 5, }, - AutoterminationMinutes: 15, + AutoterminationMinutes: 10, NodeTypeID: "Standard_D3_v2", SparkVersion: "5.3.x-scala2.11", }, diff --git a/controllers/djob_controller_databricks.go b/controllers/djob_controller_databricks.go index 43d5bdf..90c1007 100644 --- a/controllers/djob_controller_databricks.go +++ b/controllers/djob_controller_databricks.go @@ -19,23 +19,60 @@ package controllers import ( "context" "fmt" - "reflect" - "strings" - databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "reflect" + "sigs.k8s.io/controller-runtime/pkg/client" + "strings" ) func (r *DjobReconciler) submit(instance *databricksv1alpha1.Djob) error { r.Log.Info(fmt.Sprintf("Submitting job %s", instance.GetName())) - instance.Spec.Name = instance.GetName() - - job, err := r.APIClient.Jobs().Create(*instance.Spec) + //Get exisiting dbricks cluster by cluster name and set ExistingClusterID or + //Get exisiting dbricks cluster by cluster id + var ownerInstance databricksv1alpha1.Dcluster + if len(instance.Spec.ExistingClusterName) > 0 { + dClusterNamespacedName := types.NamespacedName{Name: instance.Spec.ExistingClusterName, Namespace: instance.Namespace} + err := r.Get(context.Background(), dClusterNamespacedName, &ownerInstance) + if err != nil { + return err + } + if (ownerInstance.Status != nil) && (ownerInstance.Status.ClusterInfo != nil) && len(ownerInstance.Status.ClusterInfo.ClusterID) > 0 { + instance.Spec.ExistingClusterID = ownerInstance.Status.ClusterInfo.ClusterID + } else { + return fmt.Errorf("failed to get ClusterID of %v", instance.Spec.ExistingClusterName) + } + } else if len(instance.Spec.ExistingClusterID) > 0 { + var dclusters databricksv1alpha1.DclusterList + err := r.List(context.Background(), &dclusters, client.InNamespace(instance.Namespace), client.MatchingField(dclusterIndexKey, instance.Spec.ExistingClusterID)) + if err != nil { + return err + } + if len(dclusters.Items) == 1 { + ownerInstance = dclusters.Items[0] + } else { + return fmt.Errorf("failed to get ClusterID of %v", instance.Spec.ExistingClusterID) + } + } + //Set Exisiting cluster as Owner of JOb + if &ownerInstance != nil && len(ownerInstance.APIVersion) > 0 && len(ownerInstance.Kind) > 0 && len(ownerInstance.GetName()) > 0 { + references := []metav1.OwnerReference{ + { + APIVersion: ownerInstance.APIVersion, + Kind: ownerInstance.Kind, + Name: ownerInstance.GetName(), + UID: ownerInstance.GetUID(), + }, + } + instance.ObjectMeta.SetOwnerReferences(references) + } + jobSettings := databricksv1alpha1.ToDatabricksJobSettings(instance.Spec) + job, err := r.APIClient.Jobs().Create(jobSettings) if err != nil { return err } - instance.Spec.Name = instance.GetName() instance.Status = &databricksv1alpha1.DjobStatus{ JobStatus: &job, diff --git a/controllers/djob_controller_test.go b/controllers/djob_controller_test.go index ff5c798..41507a6 100644 --- a/controllers/djob_controller_test.go +++ b/controllers/djob_controller_test.go @@ -26,14 +26,13 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + types "k8s.io/apimachinery/pkg/types" ) var _ = Describe("Djob Controller", func() { const timeout = time.Second * 30 const interval = time.Second * 1 - BeforeEach(func() { // Add any setup steps that needs to be executed before each test }) @@ -46,19 +45,19 @@ var _ = Describe("Djob Controller", func() { // your API definition. // Avoid adding tests for vanilla CRUD operations because they would // test Kubernetes API server, which isn't the goal here. - Context("Job with schedule", func() { + Context("Job with schedule on New Cluster", func() { It("Should create successfully", func() { - key := types.NamespacedName{ - Name: "integreation-test-job-with-schedule", + testDjobkey := types.NamespacedName{ + Name: "t-job-with-schedule-new-cluster" + "-" + randomStringWithCharset(10, charset), Namespace: "default", } - spec := &dbmodels.JobSettings{ + spec := databricksv1alpha1.JobSettings{ NewCluster: &dbmodels.NewCluster{ SparkVersion: "5.3.x-scala2.11", NodeTypeID: "Standard_D3_v2", - NumWorkers: 10, + NumWorkers: 2, }, Libraries: []dbmodels.Library{ { @@ -81,12 +80,201 @@ var _ = Describe("Djob Controller", func() { }, } + created := &databricksv1alpha1.Djob{ + ObjectMeta: metav1.ObjectMeta{ + Name: testDjobkey.Name, + Namespace: testDjobkey.Namespace, + }, + Spec: &spec, + } + + // Create + Expect(k8sClient.Create(context.Background(), created)).Should(Succeed()) + + By("Expecting submitted") + Eventually(func() bool { + f := &databricksv1alpha1.Djob{} + _ = k8sClient.Get(context.Background(), testDjobkey, f) + return f.IsSubmitted() + }, timeout, interval).Should(BeTrue()) + + // Delete + By("Expecting to delete successfully") + Eventually(func() error { + f := &databricksv1alpha1.Djob{} + _ = k8sClient.Get(context.Background(), testDjobkey, f) + return k8sClient.Delete(context.Background(), f) + }, timeout, interval).Should(Succeed()) + + By("Expecting to delete finish") + Eventually(func() error { + f := &databricksv1alpha1.Djob{} + return k8sClient.Get(context.Background(), testDjobkey, f) + }, timeout, interval).ShouldNot(Succeed()) + }) + }) + Context("Job with schedule on Exsisting Cluster", func() { + + var testDclusterKey types.NamespacedName + + BeforeEach(func() { + // Add any setup steps that needs to be executed before each test + testDclusterKey = types.NamespacedName{ + Name: "t-cluster" + "-" + randomStringWithCharset(10, charset), + Namespace: "default", + } + + dcluster := &databricksv1alpha1.Dcluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: testDclusterKey.Name, + Namespace: testDclusterKey.Namespace, + }, + Spec: &dbmodels.NewCluster{ + Autoscale: &dbmodels.AutoScale{ + MinWorkers: 2, + MaxWorkers: 3, + }, + AutoterminationMinutes: 10, + NodeTypeID: "Standard_D3_v2", + SparkVersion: "5.3.x-scala2.11", + }, + } + + // Create testDcluster + _ = k8sClient.Create(context.Background(), dcluster) + testK8sDcluster := &databricksv1alpha1.Dcluster{} + Eventually(func() error { + return k8sClient.Get(context.Background(), testDclusterKey, testK8sDcluster) + }, timeout, interval).Should(Succeed()) + + }) + + AfterEach(func() { + // Add any teardown steps that needs to be executed after each test + // Delete test Dcluster + f := &databricksv1alpha1.Dcluster{} + _ = k8sClient.Get(context.Background(), testDclusterKey, f) + _ = k8sClient.Delete(context.Background(), f) + }) + + It("Should create successfully on Exsisting Cluster by name", func() { + + testK8sDcluster := &databricksv1alpha1.Dcluster{} + By("Expecting Dcluster submitted") + Eventually(func() bool { + _ = k8sClient.Get(context.Background(), testDclusterKey, testK8sDcluster) + return testK8sDcluster.IsSubmitted() + }, timeout, interval).Should(BeTrue()) + + Expect(testK8sDcluster.Status).ShouldNot(BeNil()) + Expect(testK8sDcluster.Status.ClusterInfo).ShouldNot(BeNil()) + key := types.NamespacedName{ + Name: "t-job-with-schedule-exsisting-cluster" + "-" + testK8sDcluster.GetName(), + Namespace: "default", + } + + spec := databricksv1alpha1.JobSettings{ + ExistingClusterName: testK8sDcluster.GetName(), + Libraries: []dbmodels.Library{ + { + Jar: "dbfs:/my-jar.jar", + }, + { + Maven: &dbmodels.MavenLibrary{ + Coordinates: "org.jsoup:jsoup:1.7.2", + }, + }, + }, + TimeoutSeconds: 3600, + MaxRetries: 1, + Schedule: &dbmodels.CronSchedule{ + QuartzCronExpression: "0 15 22 ? * *", + TimezoneID: "America/Los_Angeles", + }, + SparkJarTask: &dbmodels.SparkJarTask{ + MainClassName: "com.databricks.ComputeModels", + }, + } + + created := &databricksv1alpha1.Djob{ + ObjectMeta: metav1.ObjectMeta{ + Name: key.Name, + Namespace: key.Namespace, + }, + Spec: &spec, + } + + // Create + Expect(k8sClient.Create(context.Background(), created)).Should(Succeed()) + + By("Expecting submitted") + Eventually(func() bool { + f := &databricksv1alpha1.Djob{} + _ = k8sClient.Get(context.Background(), key, f) + return f.IsSubmitted() + }, timeout, interval).Should(BeTrue()) + + // Delete + By("Expecting to delete successfully") + Eventually(func() error { + f := &databricksv1alpha1.Djob{} + _ = k8sClient.Get(context.Background(), key, f) + return k8sClient.Delete(context.Background(), f) + }, timeout, interval).Should(Succeed()) + + By("Expecting to delete finish") + Eventually(func() error { + f := &databricksv1alpha1.Djob{} + return k8sClient.Get(context.Background(), key, f) + }, timeout, interval).ShouldNot(Succeed()) + + }) + + It("Should create successfully on Exsisting Cluster using ID", func() { + + testK8sDcluster := &databricksv1alpha1.Dcluster{} + By("Expecting Dcluster submitted") + Eventually(func() bool { + _ = k8sClient.Get(context.Background(), testDclusterKey, testK8sDcluster) + return testK8sDcluster.IsSubmitted() + }, timeout, interval).Should(BeTrue()) + + Expect(testK8sDcluster.Status).ShouldNot(BeNil()) + Expect(testK8sDcluster.Status.ClusterInfo).ShouldNot(BeNil()) + key := types.NamespacedName{ + Name: "t-job-with-schedule-exsisting-cluster" + "-" + testK8sDcluster.Status.ClusterInfo.ClusterID, + Namespace: "default", + } + + spec := databricksv1alpha1.JobSettings{ + ExistingClusterID: testK8sDcluster.Status.ClusterInfo.ClusterID, + Libraries: []dbmodels.Library{ + { + Jar: "dbfs:/my-jar.jar", + }, + { + Maven: &dbmodels.MavenLibrary{ + Coordinates: "org.jsoup:jsoup:1.7.2", + }, + }, + }, + TimeoutSeconds: 3600, + MaxRetries: 1, + Schedule: &dbmodels.CronSchedule{ + QuartzCronExpression: "0 15 22 ? * *", + TimezoneID: "America/Los_Angeles", + }, + SparkJarTask: &dbmodels.SparkJarTask{ + MainClassName: "com.databricks.ComputeModels", + }, + } + created := &databricksv1alpha1.Djob{ ObjectMeta: metav1.ObjectMeta{ Name: key.Name, Namespace: key.Namespace, }, - Spec: spec, + Spec: &spec, } // Create @@ -112,6 +300,9 @@ var _ = Describe("Djob Controller", func() { f := &databricksv1alpha1.Djob{} return k8sClient.Get(context.Background(), key, f) }, timeout, interval).ShouldNot(Succeed()) + }) + }) + }) diff --git a/controllers/run_controller_test.go b/controllers/run_controller_test.go index 8b512d4..b56c585 100644 --- a/controllers/run_controller_test.go +++ b/controllers/run_controller_test.go @@ -55,11 +55,11 @@ var _ = Describe("Run Controller", func() { It("Should create successfully", func() { By("Create job for run") jobKey := types.NamespacedName{ - Name: "integreation-test-job-for-run", + Name: "t-job-for-run" + "-" + randomStringWithCharset(10, charset), Namespace: "default", } - jobSpec := &dbmodels.JobSettings{ + jobSpec := &databricksv1alpha1.JobSettings{ NewCluster: &dbmodels.NewCluster{ SparkVersion: "5.3.x-scala2.11", NodeTypeID: "Standard_D3_v2", @@ -99,7 +99,7 @@ var _ = Describe("Run Controller", func() { By("Create the run itself") runKey := types.NamespacedName{ - Name: "integreation-test-job-for-run-run", + Name: "t-job-for-run-run" + "-" + randomStringWithCharset(10, charset), Namespace: "default", } diff --git a/controllers/secretscope_controller_test.go b/controllers/secretscope_controller_test.go index 7774b59..5ddb78a 100644 --- a/controllers/secretscope_controller_test.go +++ b/controllers/secretscope_controller_test.go @@ -27,7 +27,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "math/rand" ) var _ = Describe("SecretScope Controller", func() { @@ -36,8 +35,8 @@ var _ = Describe("SecretScope Controller", func() { const interval = time.Second * 1 const charset = "abcdefghijklmnopqrstuvwxyz" - const aclKeyName = "secretscope-with-acls" - const secretsKeyName = "secretscope-with-secrets" + var aclKeyName = "t-secretscope-with-acls" + randomStringWithCharset(10, charset) + var secretsKeyName = "t-secretscope-with-secrets" + randomStringWithCharset(10, charset) BeforeEach(func() { // failed test runs that don't clean up leave resources behind. @@ -67,10 +66,8 @@ var _ = Describe("SecretScope Controller", func() { }, } - name := aclKeyName + "-" + randomStringWithCharset(10, charset) - key := types.NamespacedName{ - Name: name, + Name: aclKeyName, Namespace: "default", } @@ -131,7 +128,7 @@ var _ = Describe("SecretScope Controller", func() { // setup k8s secret k8SecretKey := types.NamespacedName{ - Name: "k8secret", + Name: "t-k8secret", Namespace: "default", } @@ -159,7 +156,7 @@ var _ = Describe("SecretScope Controller", func() { Key: "secretFromSecret", ValueFrom: &databricksv1alpha1.SecretScopeValueFrom{ SecretKeyRef: databricksv1alpha1.SecretScopeKeyRef{ - Name: "k8secret", + Name: "t-k8secret", Key: "username", }, }, @@ -177,10 +174,8 @@ var _ = Describe("SecretScope Controller", func() { }, } - name := secretsKeyName + "-" + randomStringWithCharset(10, charset) - key := types.NamespacedName{ - Name: name, + Name: secretsKeyName, Namespace: "default", } @@ -240,12 +235,3 @@ var _ = Describe("SecretScope Controller", func() { }) }) }) - -func randomStringWithCharset(length int, charset string) string { - var seededRand = rand.New(rand.NewSource(time.Now().UnixNano())) - b := make([]byte, length) - for i := range b { - b[i] = charset[seededRand.Intn(len(charset))] - } - return string(b) -} diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 7c26f15..25a9f07 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -17,16 +17,16 @@ limitations under the License. package controllers import ( - "os" - "path/filepath" - "testing" - "time" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/onsi/gomega/gexec" db "github.com/xinsnake/databricks-sdk-golang" dbazure "github.com/xinsnake/databricks-sdk-golang/azure" + "math/rand" + "os" + "path/filepath" + "testing" + "time" databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1" "k8s.io/client-go/kubernetes/scheme" @@ -162,3 +162,14 @@ var _ = AfterSuite(func() { err := testEnv.Stop() Expect(err).ToNot(HaveOccurred()) }) + +const charset = "abcdefghijklmnopqrstuvwxyz" + +func randomStringWithCharset(length int, charset string) string { + var seededRand = rand.New(rand.NewSource(time.Now().UnixNano())) + b := make([]byte, length) + for i := range b { + b[i] = charset[seededRand.Intn(len(charset))] + } + return string(b) +} diff --git a/controllers/workspaceitem_controller_databricks.go b/controllers/workspaceitem_controller_databricks.go index c99c284..3873bc0 100644 --- a/controllers/workspaceitem_controller_databricks.go +++ b/controllers/workspaceitem_controller_databricks.go @@ -28,6 +28,9 @@ import ( func (r *WorkspaceItemReconciler) submit(instance *databricksv1alpha1.WorkspaceItem) error { r.Log.Info(fmt.Sprintf("Create item %s", instance.GetName())) + if instance.Spec == nil || len(instance.Spec.Content) <= 0 { + return fmt.Errorf("Workspace Content is empty") + } data, err := base64.StdEncoding.DecodeString(instance.Spec.Content) if err != nil { return err diff --git a/controllers/workspaceitem_controller_test.go b/controllers/workspaceitem_controller_test.go index d8b5886..27e4f56 100644 --- a/controllers/workspaceitem_controller_test.go +++ b/controllers/workspaceitem_controller_test.go @@ -48,7 +48,7 @@ var _ = Describe("WorkspaceItem Controller", func() { It("Should create successfully", func() { key := types.NamespacedName{ - Name: "block-greater-than-1mb", + Name: "t-workspace-item" + randomStringWithCharset(10, charset), Namespace: "default", } diff --git a/docs/images/azure-databricks-operator-highlevel.jpg b/docs/images/azure-databricks-operator-highlevel.jpg new file mode 100644 index 0000000..de9927c Binary files /dev/null and b/docs/images/azure-databricks-operator-highlevel.jpg differ diff --git a/docs/samples/2_job_run/run_basic1_periodic_on_existing_cluster_by_name.yaml b/docs/samples/2_job_run/run_basic1_periodic_on_existing_cluster_by_name.yaml new file mode 100644 index 0000000..efcf88f --- /dev/null +++ b/docs/samples/2_job_run/run_basic1_periodic_on_existing_cluster_by_name.yaml @@ -0,0 +1,17 @@ +apiVersion: databricks.microsoft.com/v1alpha1 +kind: Djob +metadata: + name: djob-basic-with-cluster-name +spec: + # This spec is directly linked to the JobSettings structure + # https://docs.databricks.com/api/latest/jobs.html#jobsettings + existing_cluster_name: dcluster-interactive1 + timeout_seconds: 3600 + max_retries: 1 + schedule: + quartz_cron_expression: 0 0/1 * * * ? + timezone_id: America/Los_Angeles + notebook_task: + base_parameters: + "name": "Azadeh" + notebook_path: "/samples/basic1" diff --git a/docs/samples/2_job_run/run_basic1_periodic_on_new_cluster.yaml b/docs/samples/2_job_run/run_basic1_periodic_on_new_cluster.yaml new file mode 100644 index 0000000..15dc65f --- /dev/null +++ b/docs/samples/2_job_run/run_basic1_periodic_on_new_cluster.yaml @@ -0,0 +1,20 @@ +apiVersion: databricks.microsoft.com/v1alpha1 +kind: Djob +metadata: + name: djob-basic-on-new-cluster +spec: + # This spec is directly linked to the JobSettings structure + # https://docs.databricks.com/api/latest/jobs.html#jobsettings + new_cluster: + spark_version: 5.3.x-scala2.11 + node_type_id: Standard_D3_v2 + num_workers: 3 + timeout_seconds: 3600 + max_retries: 1 + schedule: + quartz_cron_expression: 0 0/5 * * * ? + timezone_id: America/Los_Angeles + notebook_task: + base_parameters: + "name": "Azadeh" + notebook_path: "/samples/basic1"