Skip to content
This repository was archived by the owner on Jan 28, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v1alpha1/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Code generated by controller-gen. DO NOT EDIT.
// autogenerated by controller-gen object, do not modify manually

package v1alpha1

Expand Down
19 changes: 18 additions & 1 deletion controllers/dbfsblock_controller_databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,48 @@ func (r *DbfsBlockReconciler) submit(instance *databricksv1alpha1.DbfsBlock) err
}

// Open handler
execution := NewExecution("dbfsblocks", "create")
createResponse, err := r.APIClient.Dbfs().Create(instance.Spec.Path, true)
execution.Finish(err)

if err != nil {
return err
}

// DataBricks limits the AddBlock size to be 1024KB
var g = 1000
for i := 0; i < len(data); i += g {
execution = NewExecution("dbfsblocks", "add_block")

if i+g <= len(data) {
err = r.APIClient.Dbfs().AddBlock(createResponse.Handle, data[i:i+g])
} else {
err = r.APIClient.Dbfs().AddBlock(createResponse.Handle, data[i:])
}

execution.Finish(err)

if err != nil {
return err
}
}

// Close handler
execution = NewExecution("dbfsblocks", "close")
err = r.APIClient.Dbfs().Close(createResponse.Handle)
execution.Finish(err)

if err != nil {
return err
}

time.Sleep(1 * time.Second)

// Refresh info
execution = NewExecution("dbfsblocks", "get_status")
fileInfo, err := r.APIClient.Dbfs().GetStatus(instance.Spec.Path)
execution.Finish(err)

if err != nil {
return err
}
Expand All @@ -83,5 +97,8 @@ func (r *DbfsBlockReconciler) delete(instance *databricksv1alpha1.DbfsBlock) err

path := instance.Status.FileInfo.Path

return r.APIClient.Dbfs().Delete(path, true)
execution := NewExecution("dbfsblocks", "delete")
err := r.APIClient.Dbfs().Delete(path, true)
execution.Finish(err)
return err
}
26 changes: 8 additions & 18 deletions controllers/dcluster_controller_databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"reflect"

databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1"
"github.com/prometheus/client_golang/prometheus"
dbmodels "github.com/xinsnake/databricks-sdk-golang/azure/models"
)

Expand Down Expand Up @@ -80,31 +79,22 @@ func (r *DclusterReconciler) delete(instance *databricksv1alpha1.Dcluster) error
return nil
}

return trackExecutionTime(dclusterDeleteDuration, func() error {
err := r.APIClient.Clusters().PermanentDelete(instance.Status.ClusterInfo.ClusterID)
trackSuccessFailure(err, dclusterCounterVec, "delete")
return err
})
execution := NewExecution("dclusters", "delete")
err := r.APIClient.Clusters().PermanentDelete(instance.Status.ClusterInfo.ClusterID)
execution.Finish(err)
return err
}

func (r *DclusterReconciler) getCluster(clusterID string) (cluster dbmodels.ClusterInfo, err error) {
timer := prometheus.NewTimer(dclusterGetDuration)
defer timer.ObserveDuration()

execution := NewExecution("dclusters", "get")
cluster, err = r.APIClient.Clusters().Get(clusterID)

trackSuccessFailure(err, dclusterCounterVec, "get")

execution.Finish(err)
return cluster, err
}

func (r *DclusterReconciler) createCluster(instance *databricksv1alpha1.Dcluster) (cluster dbmodels.ClusterInfo, err error) {
timer := prometheus.NewTimer(dclusterCreateDuration)
defer timer.ObserveDuration()

execution := NewExecution("dclusters", "create")
cluster, err = r.APIClient.Clusters().Create(*instance.Spec)

trackSuccessFailure(err, dclusterCounterVec, "create")

execution.Finish(err)
return cluster, err
}
53 changes: 0 additions & 53 deletions controllers/dcluster_metrics.go

This file was deleted.

32 changes: 11 additions & 21 deletions controllers/djob_controller_databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"strings"

databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1"
"github.com/prometheus/client_golang/prometheus"
dbmodels "github.com/xinsnake/databricks-sdk-golang/azure/models"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -71,7 +70,8 @@ func (r *DjobReconciler) submit(instance *databricksv1alpha1.Djob) error {
}
instance.ObjectMeta.SetOwnerReferences(references)
}
job, err := r.createJob(instance)
jobSettings := databricksv1alpha1.ToDatabricksJobSettings(instance.Spec)
job, err := r.createJob(jobSettings)

if err != nil {
return err
Expand Down Expand Up @@ -138,32 +138,22 @@ func (r *DjobReconciler) delete(instance *databricksv1alpha1.Djob) error {
return err
}

return trackExecutionTime(djobDeleteDuration, func() error {
err := r.APIClient.Jobs().Delete(jobID)
trackSuccessFailure(err, djobCounterVec, "delete")
return err
})
execution := NewExecution("djobs", "delete")
err := r.APIClient.Jobs().Delete(jobID)
execution.Finish(err)
return err
}

func (r *DjobReconciler) getJob(jobID int64) (job dbmodels.Job, err error) {
timer := prometheus.NewTimer(djobGetDuration)
defer timer.ObserveDuration()

execution := NewExecution("djobs", "get")
job, err = r.APIClient.Jobs().Get(jobID)

trackSuccessFailure(err, djobCounterVec, "get")

execution.Finish(err)
return job, err
}

func (r *DjobReconciler) createJob(instance *databricksv1alpha1.Djob) (job dbmodels.Job, err error) {
timer := prometheus.NewTimer(djobCreateDuration)
defer timer.ObserveDuration()

jobSettings := databricksv1alpha1.ToDatabricksJobSettings(instance.Spec)
func (r *DjobReconciler) createJob(jobSettings dbmodels.JobSettings) (job dbmodels.Job, err error) {
execution := NewExecution("djobs", "create")
job, err = r.APIClient.Jobs().Create(jobSettings)

trackSuccessFailure(err, djobCounterVec, "create")

execution.Finish(err)
return job, err
}
53 changes: 0 additions & 53 deletions controllers/djob_metrics.go

This file was deleted.

39 changes: 31 additions & 8 deletions controllers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,48 @@ limitations under the License.
package controllers

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

const (
metricPrefix = "databricks_"
successMetric = "success"
failureMetric = "failure"
)

func trackExecutionTime(histogram prometheus.Histogram, f func() error) error {
timer := prometheus.NewTimer(histogram)
defer timer.ObserveDuration()
return f()
var databricksRequestHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "databricks_request_duration_seconds",
Help: "Duration of upstream calls to Databricks REST service endpoints",
}, []string{"object_type", "action", "outcome"})

func init() {
// Register custom metrics with the global prometheus registry
metrics.Registry.MustRegister(databricksRequestHistogram)
}

// NewExecution creates an Execution instance and starts the timer
func NewExecution(objectType string, action string) Execution {
return Execution{
begin: time.Now(),
labels: prometheus.Labels{"object_type": objectType, "action": action},
}
}

// Execution tracks state for an API execution for emitting metrics
type Execution struct {
begin time.Time
labels prometheus.Labels
}

func trackSuccessFailure(err error, counterVec *prometheus.CounterVec, method string) {
// Finish is used to log duration and success/failure
func (e *Execution) Finish(err error) {
if err == nil {
counterVec.With(prometheus.Labels{"status": successMetric, "method": method}).Inc()
e.labels["outcome"] = successMetric
} else {
counterVec.With(prometheus.Labels{"status": failureMetric, "method": method}).Inc()
e.labels["outcome"] = failureMetric
}
duration := time.Since(e.begin)
databricksRequestHistogram.With(e.labels).Observe(duration.Seconds())
}
Loading