Skip to content
Merged
127 changes: 127 additions & 0 deletions bundle/run/args.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package run

import (
"fmt"
"strings"

"github.com/spf13/cobra"
)

// argsHandler defines the (unexported) interface for the runners in this
// package to implement to handle context-specific positional arguments.
//
// For jobs, this means:
// - If a job uses job parameters: parse positional arguments into key-value pairs
// and pass them as job parameters.
// - If a job does not use job parameters AND only has Spark Python tasks:
// pass through the positional arguments as a list of Python parameters.
// - If a job does not use job parameters AND only has notebook tasks:
// parse arguments into key-value pairs and pass them as notebook parameters.
// - ...
//
// In all cases, we may be able to provide context-aware argument completions.
type argsHandler interface {
// Parse additional positional arguments.
ParseArgs(args []string, opts *Options) error

// Complete additional positional arguments.
CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective)
}

// nopArgsHandler is a no-op implementation of [argsHandler].
// It returns an error if any positional arguments are present and doesn't complete anything.
type nopArgsHandler struct{}

func (nopArgsHandler) ParseArgs(args []string, opts *Options) error {
if len(args) == 0 {
return nil
}

return fmt.Errorf("received %d unexpected positional arguments", len(args))
}

func (nopArgsHandler) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return nil, cobra.ShellCompDirectiveNoFileComp
}

// argsToKeyValueMap parses key-value pairs from the specified arguments.
//
// It accepts these formats:
// - `--key=value`
// - `--key`, `value`
//
// Remaining arguments are returned as-is.
func argsToKeyValueMap(args []string) (map[string]string, []string) {
kv := make(map[string]string)
key := ""
tail := args

for i, arg := range args {
// If key is set; use the next argument as value.
if key != "" {
kv[key] = arg
key = ""
tail = args[i+1:]
continue
}

if strings.HasPrefix(arg, "--") {
Comment thread
lennartkats-db marked this conversation as resolved.
parts := strings.SplitN(arg[2:], "=", 2)
if len(parts) == 2 {
kv[parts[0]] = parts[1]
tail = args[i+1:]
continue
}

// Use this argument as key, the next as value.
key = parts[0]
continue
}

// If we cannot interpret it; return here.
break
}

return kv, tail
Comment thread
pietern marked this conversation as resolved.
}

// genericParseKeyValueArgs parses key-value pairs from the specified arguments.
// If there are any positional arguments left, it returns an error.
func genericParseKeyValueArgs(args []string) (map[string]string, error) {
kv, args := argsToKeyValueMap(args)
if len(args) > 0 {
return nil, fmt.Errorf("received %d unexpected positional arguments", len(args))
}

return kv, nil
}

// genericCompleteKeyValueArgs completes key-value pairs from the specified arguments.
// Completion options that are already specified are skipped.
func genericCompleteKeyValueArgs(args []string, toComplete string, options []string) ([]string, cobra.ShellCompDirective) {
// If the string to complete contains an equals sign, then we are
// completing the value part (which we don't know here).
if strings.Contains(toComplete, "=") {
return nil, cobra.ShellCompDirectiveNoFileComp
}

// Remove already completed key/value pairs.
kv, args := argsToKeyValueMap(args)

// If the list of remaining args is empty, return possible completions.
if len(args) == 0 {
var completions []string
for _, option := range options {
// Skip options that have already been specified.
if _, ok := kv[option]; ok {
continue
}
completions = append(completions, fmt.Sprintf("--%s=", option))
}
// Note: we include cobra.ShellCompDirectiveNoSpace to suggest including
// the value part right after the equals sign.
return completions, cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveNoSpace
}

return nil, cobra.ShellCompDirectiveNoFileComp
}
114 changes: 114 additions & 0 deletions bundle/run/args_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package run

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNopArgsHandler(t *testing.T) {
h := nopArgsHandler{}
opts := &Options{}

// No error if no positional arguments are passed.
err := h.ParseArgs([]string{}, opts)
assert.NoError(t, err)

// Error if any positional arguments are passed.
err = h.ParseArgs([]string{"foo"}, opts)
assert.EqualError(t, err, "received 1 unexpected positional arguments")

// No completions.
completions, _ := h.CompleteArgs([]string{}, "")
assert.Nil(t, completions)
}

func TestArgsToKeyValueMap(t *testing.T) {
for _, tc := range []struct {
input []string
expected map[string]string
tail []string
err error
}{
{
input: []string{"--foo=bar", "--baz", "qux"},
expected: map[string]string{
"foo": "bar",
"baz": "qux",
},
tail: []string{},
},
{
input: []string{"--foo=bar", "--baz", "qux", "tail"},
expected: map[string]string{
"foo": "bar",
"baz": "qux",
},
tail: []string{"tail"},
},
{
input: []string{"--foo=bar", "--baz", "qux", "tail", "--foo=bar"},
expected: map[string]string{
"foo": "bar",
"baz": "qux",
},
tail: []string{"tail", "--foo=bar"},
},
{
input: []string{"--foo=bar", "--baz=qux"},
expected: map[string]string{
"foo": "bar",
"baz": "qux",
},
tail: []string{},
},
{
input: []string{"--foo=bar", "--baz=--qux"},
expected: map[string]string{
"foo": "bar",
"baz": "--qux",
},
tail: []string{},
},
} {
actual, tail := argsToKeyValueMap(tc.input)
assert.Equal(t, tc.expected, actual)
assert.Equal(t, tc.tail, tail)
}
}

func TestGenericParseKeyValueArgs(t *testing.T) {
kv, err := genericParseKeyValueArgs([]string{"--foo=bar", "--baz", "qux"})
assert.NoError(t, err)
assert.Equal(t, map[string]string{
"foo": "bar",
"baz": "qux",
}, kv)

_, err = genericParseKeyValueArgs([]string{"--foo=bar", "--baz", "qux", "tail"})
assert.EqualError(t, err, "received 1 unexpected positional arguments")
}

func TestGenericCompleteKeyValueArgs(t *testing.T) {
var completions []string

// Complete nothing if there are no options.
completions, _ = genericCompleteKeyValueArgs([]string{}, ``, []string{})
assert.Empty(t, completions)

// Complete nothing if we're in the middle of a key-value pair (as single argument with equals sign).
completions, _ = genericCompleteKeyValueArgs([]string{}, `--foo=`, []string{`foo`, `bar`})
assert.Empty(t, completions)

// Complete nothing if we're in the middle of a key-value pair (as two arguments).
completions, _ = genericCompleteKeyValueArgs([]string{`--foo`}, ``, []string{`foo`, `bar`})
assert.Empty(t, completions)

// Complete if we're at the beginning.
completions, _ = genericCompleteKeyValueArgs([]string{}, ``, []string{`foo`, `bar`})
assert.Equal(t, []string{`--foo=`, `--bar=`}, completions)

// Complete if we have already one key-value pair.
completions, _ = genericCompleteKeyValueArgs([]string{`--foo=bar`}, ``, []string{`foo`, `bar`})
assert.Equal(t, []string{`--bar=`}, completions)
}
80 changes: 10 additions & 70 deletions bundle/run/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,77 +15,9 @@ import (
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/fatih/color"
flag "github.com/spf13/pflag"
"github.com/spf13/cobra"
)

// JobOptions defines options for running a job.
type JobOptions struct {
dbtCommands []string
jarParams []string
notebookParams map[string]string
pipelineParams map[string]string
pythonNamedParams map[string]string
pythonParams []string
sparkSubmitParams []string
sqlParams map[string]string
}

func (o *JobOptions) Define(fs *flag.FlagSet) {
fs.StringSliceVar(&o.dbtCommands, "dbt-commands", nil, "A list of commands to execute for jobs with DBT tasks.")
fs.StringSliceVar(&o.jarParams, "jar-params", nil, "A list of parameters for jobs with Spark JAR tasks.")
fs.StringToStringVar(&o.notebookParams, "notebook-params", nil, "A map from keys to values for jobs with notebook tasks.")
fs.StringToStringVar(&o.pipelineParams, "pipeline-params", nil, "A map from keys to values for jobs with pipeline tasks.")
fs.StringToStringVar(&o.pythonNamedParams, "python-named-params", nil, "A map from keys to values for jobs with Python wheel tasks.")
fs.StringSliceVar(&o.pythonParams, "python-params", nil, "A list of parameters for jobs with Python tasks.")
fs.StringSliceVar(&o.sparkSubmitParams, "spark-submit-params", nil, "A list of parameters for jobs with Spark submit tasks.")
fs.StringToStringVar(&o.sqlParams, "sql-params", nil, "A map from keys to values for jobs with SQL tasks.")
}

func (o *JobOptions) validatePipelineParams() (*jobs.PipelineParams, error) {
if len(o.pipelineParams) == 0 {
return nil, nil
}

var defaultErr = fmt.Errorf("job run argument --pipeline-params only supports `full_refresh=<bool>`")
v, ok := o.pipelineParams["full_refresh"]
if !ok {
return nil, defaultErr
}

b, err := strconv.ParseBool(v)
if err != nil {
return nil, defaultErr
}

pipelineParams := &jobs.PipelineParams{
FullRefresh: b,
}

return pipelineParams, nil
}

func (o *JobOptions) toPayload(jobID int64) (*jobs.RunNow, error) {
pipelineParams, err := o.validatePipelineParams()
if err != nil {
return nil, err
}

payload := &jobs.RunNow{
JobId: jobID,

DbtCommands: o.dbtCommands,
JarParams: o.jarParams,
NotebookParams: o.notebookParams,
PipelineParams: pipelineParams,
PythonNamedParams: o.pythonNamedParams,
PythonParams: o.pythonParams,
SparkSubmitParams: o.sparkSubmitParams,
SqlParams: o.sqlParams,
}

return payload, nil
}

// Default timeout for waiting for a job run to complete.
var jobRunTimeout time.Duration = 24 * time.Hour

Expand Down Expand Up @@ -228,7 +160,7 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
}

// construct request payload from cmd line flags args
req, err := opts.Job.toPayload(jobID)
req, err := opts.Job.toPayload(r.job, jobID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -344,3 +276,11 @@ func (r *jobRunner) convertPythonParams(opts *Options) error {

return nil
}

func (r *jobRunner) ParseArgs(args []string, opts *Options) error {
return r.posArgsHandler().ParseArgs(args, opts)
}

func (r *jobRunner) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return r.posArgsHandler().CompleteArgs(args, toComplete)
}
Loading