Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 59 additions & 15 deletions bundle/run/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/config/resources"
"github.com/databricks/bricks/libs/flags"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/databricks-sdk-go/retries"
"github.com/databricks/databricks-sdk-go/service/jobs"
Expand Down Expand Up @@ -137,20 +138,11 @@ func (r *jobRunner) logFailedTasks(ctx context.Context, runId int64) {
yellow(task.TaskKey), task.State.LifeCycleState)
}
}

}

func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) {
jobID, err := strconv.ParseInt(r.job.ID, 10, 64)
if err != nil {
return nil, fmt.Errorf("job ID is not an integer: %s", r.job.ID)
}

func logDebugCallback(ctx context.Context, runId *int64) func(info *retries.Info[jobs.Run]) {
var prevState *jobs.RunState
var runId *int64

// This function is called each time the function below polls the run status.
update := func(info *retries.Info[jobs.Run]) {
return func(info *retries.Info[jobs.Run]) {
i := info.Info
if i == nil {
return
Expand All @@ -169,23 +161,75 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) {
log.Infof(ctx, "Run status: %s", info.Info.State.LifeCycleState)
prevState = state
}
if runId == nil {
runId = &i.RunId
if *runId == 0 {
*runId = i.RunId
}
}
}

func logProgressCallback(ctx context.Context, progressLogger *JobProgressLogger) func(info *retries.Info[jobs.Run]) {
return func(info *retries.Info[jobs.Run]) {
i := info.Info
if i == nil {
return
}

state := i.State
if state == nil {
return
}

event := &JobProgressEvent{
Timestamp: time.Now(),
JobId: i.JobId,
RunId: i.RunId,
RunName: i.RunName,
State: *i.State,
RunPageURL: i.RunPageUrl,
}

// log progress events to stderr
progressLogger.Log(event)

// log progress events in using the default logger
ctx = log.NewContext(ctx, log.GetLogger(ctx).With("event_type", "progress_event_job"))
log.Infof(ctx, event.String())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a follow up this could be done in the progress logger itself.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally kept it outside incase we would like to log more information than what's being printed on stderr. For example we can decide to print the entire json blob even if the print mode is append just so more information is logged.

However, I think the right call is to simply just print the string even if the progress logger is printing json. We can decide to print more information if it seems useful later. It goes against making the log-file a dumping ground for everything, but I would rather avoid unnecessary clutter in our logs. WDYT?

}
}

func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) {
jobID, err := strconv.ParseInt(r.job.ID, 10, 64)
if err != nil {
return nil, fmt.Errorf("job ID is not an integer: %s", r.job.ID)
}

runId := new(int64)

// construct request payload from cmd line flags args
req, err := opts.Job.toPayload(jobID)
if err != nil {
return nil, err
}

// Include resource key in logger.
ctx = log.NewContext(ctx, log.GetLogger(ctx).With("resource", r.Key()))

w := r.bundle.WorkspaceClient()
run, err := w.Jobs.RunNowAndWait(ctx, *req, retries.Timeout[jobs.Run](jobRunTimeout), update)

// callback to log status updates to the universal log destrination.
Comment thread
shreyas-goenka marked this conversation as resolved.
Outdated
// Called on every poll request
logDebug := logDebugCallback(ctx, runId)
Comment thread
shreyas-goenka marked this conversation as resolved.

// callback to log progress events. Called on every poll request
progressLogger, err := NewJobProgressLogger(opts.ProgressFormat, flags.LogLevel.String(), flags.LogFile.String())
if err != nil {
return nil, err
}
logProgress := logProgressCallback(ctx, progressLogger)

run, err := w.Jobs.RunNowAndWait(ctx, *req, retries.Timeout[jobs.Run](jobRunTimeout), logDebug, logProgress)
if err != nil && runId != nil {
r.logFailedTasks(ctx, *runId)

}
if err != nil {
return nil, err
Expand Down
74 changes: 74 additions & 0 deletions bundle/run/job_progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package run

import (
"encoding/json"
"fmt"
"os"
"strings"
"time"

"github.com/databricks/bricks/libs/flags"
"github.com/databricks/databricks-sdk-go/service/jobs"
)

type JobProgressEvent struct {
Timestamp time.Time `json:"timestamp"`
JobId int64 `json:"job_id"`
RunId int64 `json:"run_id"`
RunName string `json:"run_name"`
State jobs.RunState `json:"state"`
RunPageURL string `json:"run_page_url"`
}

func (event *JobProgressEvent) String() string {
result := strings.Builder{}
result.WriteString(event.Timestamp.Format("2006-01-02 15:04:05"))
result.WriteString(" ")
result.WriteString(event.RunName)
result.WriteString(" ")
result.WriteString(event.State.LifeCycleState.String())
if event.State.ResultState.String() != "" {
result.WriteString(" ")
result.WriteString(event.State.ResultState.String())
}
result.WriteString(" ")
result.WriteString(event.State.StateMessage)
result.WriteString(" ")
result.WriteString(event.RunPageURL)
return result.String()
}

type JobProgressLogger struct {
Mode flags.ProgressLogFormat
prevState *jobs.RunState
Comment thread
shreyas-goenka marked this conversation as resolved.
Outdated
}

func NewJobProgressLogger(mode flags.ProgressLogFormat, logLevel string, logFile string) (*JobProgressLogger, error) {
if mode == flags.ModeInplace && logLevel != "disabled" && logFile == "stderr" {
return nil, fmt.Errorf("inplace progress logging cannot be used when log-file is stderr")
}
return &JobProgressLogger{
Mode: mode,
}, nil
}

func (l *JobProgressLogger) Log(event *JobProgressEvent) {
if l.prevState != nil && l.prevState.LifeCycleState == event.State.LifeCycleState &&
l.prevState.ResultState == event.State.ResultState {
return
}
if l.prevState != nil && l.Mode == flags.ModeInplace {
fmt.Fprint(os.Stderr, "\033[1F]")
}
if l.Mode == flags.ModeJson {
b, err := json.MarshalIndent(event, "", " ")
if err != nil {
// we panic because there we cannot catch this in json.RunNowAndWait
panic(err)
}
fmt.Fprintln(os.Stderr, string(b))
} else {
fmt.Fprintln(os.Stderr, event.String())
}
l.prevState = &event.State
}
41 changes: 41 additions & 0 deletions bundle/run/job_progress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package run

import (
"testing"
"time"

"github.com/databricks/bricks/libs/flags"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
)

func TestJobProgressEventString(t *testing.T) {
event := &JobProgressEvent{
Timestamp: time.Date(0, 0, 0, 0, 0, 0, 0, &time.Location{}),
JobId: 123,
RunId: 456,
RunName: "run_name",
State: jobs.RunState{
LifeCycleState: jobs.RunLifeCycleStateTerminated,
ResultState: jobs.RunResultStateSuccess,
StateMessage: "state_message",
},
RunPageURL: "run_url",
}
assert.Equal(t, "-0001-11-30 00:00:00 run_name TERMINATED SUCCESS state_message run_url", event.String())
}

func TestJobProgressEventLoggerErrorOnIncompatibleSettings(t *testing.T) {
_, err := NewJobProgressLogger(flags.ModeInplace, "info", "stderr")
assert.ErrorContains(t, err, "inplace progress logging cannot be used when log-file is stderr")
}

func TestInplaceJobsProgressLoggerCreatedWhenLoggingDisabled(t *testing.T) {
_, err := NewJobProgressLogger(flags.ModeInplace, "disabled", "stderr")
assert.NoError(t, err)
}

func TestInplaceJobsProgressLoggerCreatedWhenLogFileIsNotStderr(t *testing.T) {
_, err := NewJobProgressLogger(flags.ModeInplace, "info", "stdout")
assert.NoError(t, err)
}
13 changes: 10 additions & 3 deletions bundle/run/options.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package run

import flag "github.com/spf13/pflag"
import (
"github.com/databricks/bricks/libs/flags"
flag "github.com/spf13/pflag"
)

type Options struct {
Job JobOptions
Pipeline PipelineOptions
Job JobOptions
Pipeline PipelineOptions
ProgressFormat flags.ProgressLogFormat
Comment thread
shreyas-goenka marked this conversation as resolved.
Outdated
}

func (o *Options) Define(fs *flag.FlagSet) {
o.Job.Define(fs)
o.Pipeline.Define(fs)

o.ProgressFormat = flags.NewProgressLogFormat()
fs.Var(&o.ProgressFormat, "progress-format", "format for progress logs (append, inplace, json)")
}
22 changes: 10 additions & 12 deletions cmd/root/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@ const (

func initializeLogger(ctx context.Context, cmd *cobra.Command) (context.Context, error) {
opts := slog.HandlerOptions{}
opts.Level = logLevel.Level()
opts.Level = flags.LogLevel.Level()
opts.AddSource = true
opts.ReplaceAttr = log.ReplaceAttrFunctions{
log.ReplaceLevelAttr,
log.ReplaceSourceAttr,
}.ReplaceAttr

// Open the underlying log file if the user configured an actual file to log to.
err := logFile.Open()
err := flags.LogFile.Open()
if err != nil {
return nil, err
}

var handler slog.Handler
switch logOutput {
case flags.OutputJSON:
handler = opts.NewJSONHandler(logFile.Writer())
handler = opts.NewJSONHandler(flags.LogFile.Writer())
case flags.OutputText:
handler = opts.NewTextHandler(logFile.Writer())
handler = opts.NewTextHandler(flags.LogFile.Writer())
default:
return nil, fmt.Errorf("invalid log output: %s", logOutput)
}
Expand All @@ -46,27 +46,25 @@ func initializeLogger(ctx context.Context, cmd *cobra.Command) (context.Context,
return log.NewContext(ctx, slog.Default()), nil
}

var logFile = flags.NewLogFileFlag()
var logLevel = flags.NewLogLevelFlag()
var logOutput = flags.OutputText

func init() {
// Configure defaults from environment, if applicable.
// If the provided value is invalid it is ignored.
if v, ok := os.LookupEnv(envBricksLogFile); ok {
logFile.Set(v)
flags.LogFile.Set(v)
}
if v, ok := os.LookupEnv(envBricksLogLevel); ok {
logLevel.Set(v)
flags.LogLevel.Set(v)
}
if v, ok := os.LookupEnv(envBricksLogFormat); ok {
logOutput.Set(v)
}

RootCmd.PersistentFlags().Var(&logFile, "log-file", "file to write logs to")
RootCmd.PersistentFlags().Var(&logLevel, "log-level", "log level")
RootCmd.PersistentFlags().Var(&flags.LogFile, "log-file", "file to write logs to")
RootCmd.PersistentFlags().Var(&flags.LogLevel, "log-level", "log level")
RootCmd.PersistentFlags().Var(&logOutput, "log-format", "log output format (text or json)")
RootCmd.RegisterFlagCompletionFunc("log-file", logFile.Complete)
RootCmd.RegisterFlagCompletionFunc("log-level", logLevel.Complete)
RootCmd.RegisterFlagCompletionFunc("log-file", flags.LogFile.Complete)
RootCmd.RegisterFlagCompletionFunc("log-level", flags.LogLevel.Complete)
RootCmd.RegisterFlagCompletionFunc("log-format", logOutput.Complete)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/hashicorp/terraform-json v0.16.0
golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0
golang.org/x/sync v0.1.0
golang.org/x/term v0.6.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
2 changes: 2 additions & 0 deletions libs/flags/log_file_flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/spf13/cobra"
)

var LogFile = NewLogFileFlag()

// Abstract over files that are already open (e.g. stderr) and
// files that need to be opened before use.
type logFile interface {
Expand Down
2 changes: 2 additions & 0 deletions libs/flags/log_level_flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"golang.org/x/exp/slog"
)

var LogLevel = NewLogLevelFlag()

var levels = map[string]slog.Level{
"trace": log.LevelTrace,
"debug": log.LevelDebug,
Expand Down
Loading