From bb347a1dc56d0fb9cb878e1f73bec830c308d807 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Tue, 8 Oct 2024 14:15:42 +0000 Subject: [PATCH] start: allow users to call job start command to start up a previously stopped job --- command/commands.go | 10 ++ command/job_start.go | 252 ++++++++++++++++++++++++++++++++++++++ command/job_start_test.go | 200 ++++++++++++++++++++++++++++++ 3 files changed, 462 insertions(+) create mode 100644 command/job_start.go create mode 100644 command/job_start_test.go diff --git a/command/commands.go b/command/commands.go index 7a46c08d4844..9fb41f4ad98d 100644 --- a/command/commands.go +++ b/command/commands.go @@ -521,6 +521,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "job start": func() (cli.Command, error) { + return &JobStartCommand{ + Meta: meta, + }, nil + }, "job validate": func() (cli.Command, error) { return &JobValidateCommand{ Meta: meta, @@ -1071,6 +1076,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "start": func() (cli.Command, error) { + return &JobStartCommand{ + Meta: meta, + }, nil + }, "system": func() (cli.Command, error) { return &SystemCommand{ Meta: meta, diff --git a/command/job_start.go b/command/job_start.go new file mode 100644 index 000000000000..43479515343e --- /dev/null +++ b/command/job_start.go @@ -0,0 +1,252 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package command + +import ( + "fmt" + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/api/contexts" + "github.com/posener/complete" + "os" + "strings" + "sync" +) + +type JobStartCommand struct { + Meta +} + +func (c *JobStartCommand) Help() string { + helpText := ` +Usage: nomad job start [options] +Alias: nomad start + + Start an existing stopped job. This command is used to start a previously stopped job's + most recent running version up. Upon successful start, an interactive + monitor session will start to display log lines as the job starts up its + allocations based on its most recent running version. It is safe to exit the monitor + early using ctrl+c. + + When ACLs are enabled, this command requires a token with the 'submit-job' + and 'read-job' capabilities for the job's namespace. The 'list-jobs' + capability is required to run the command with job prefixes instead of exact + job IDs. + + +General Options: + + ` + generalOptionsUsage(usageOptsDefault) + ` + +Start Options: + + -detach + Return immediately instead of entering monitor mode. After the + job start command is submitted, a new evaluation ID is printed to the + screen, which can be used to examine the evaluation using the eval-status + command. + + -consul-token + The Consul token used to verify that the caller has access to the Service + Identity policies associated in the targeted version of the job. + + -vault-token + The Vault token used to verify that the caller has access to the Vault + policies in the targeted version of the job. + + -verbose + Display full information. +` + return strings.TrimSpace(helpText) +} + +func (c *JobStartCommand) Synopsis() string { + return "Start a stopped job" +} + +func (c *JobStartCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-detach": complete.PredictNothing, + "-verbose": complete.PredictNothing, + }) +} + +func (c *JobStartCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFunc(func(a complete.Args) []string { + client, err := c.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Jobs, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Jobs] + }) +} +func (c *JobStartCommand) Name() string { return "job start" } + +func (c *JobStartCommand) Run(args []string) int { + var detach, verbose bool + var consulToken, vaultToken string + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&detach, "detach", false, "") + flags.BoolVar(&verbose, "verbose", false, "") + flags.StringVar(&consulToken, "consul-token", "", "") + flags.StringVar(&vaultToken, "vault-token", "", "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + // Check that we got at least one job + args = flags.Args() + if len(args) < 1 { + c.Ui.Error("This command takes at least one argument: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + var jobIDs []string + for _, jobID := range flags.Args() { + jobIDs = append(jobIDs, strings.TrimSpace(jobID)) + } + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + statusCh := make(chan int, len(jobIDs)) + var wg sync.WaitGroup + + for _, jobID := range jobIDs { + jobID := jobID + + wg.Add(1) + go func() { + defer wg.Done() + + // Truncate the id unless full length is requested + length := shortId + if verbose { + length = fullId + } + + // Check if the job exists and has been stopped + jobId, namespace, err := c.JobIDByPrefix(client, jobID, nil) + if err != nil { + c.Ui.Error(err.Error()) + statusCh <- 1 + return + } + job, err := c.JobByPrefix(client, jobId, nil) + if err != nil { + c.Ui.Error(err.Error()) + statusCh <- 1 + return + } + if *job.Status != "dead" { + c.Ui.Error(fmt.Sprintf("Job %v has not been stopped and has following status: %v", *job.Name, *job.Status)) + statusCh <- 1 + return + + } + + // Get all versions associated to current job + q := &api.QueryOptions{Namespace: namespace} + + versions, _, _, err := client.Jobs().Versions(jobID, true, q) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error retrieving job versions: %s", err)) + statusCh <- 1 + } + + // Find the most recent running version for this job + var chosenVersion *api.Job + var chosenIndex uint64 + versionAvailable := false + for i := len(versions) - 1; i >= 0; i-- { + if *versions[i].Status == "running" { + chosenVersion = versions[i] + chosenIndex = uint64(i) + versionAvailable = true + } + + } + if !versionAvailable { + c.Ui.Error(fmt.Sprintf("No previous running versions of job %v, %s", chosenVersion, err)) + statusCh <- 1 + return + } + + // Parse the Consul token + if consulToken == "" { + // Check the environment variable + consulToken = os.Getenv("CONSUL_HTTP_TOKEN") + } + + // Parse the Vault token + if vaultToken == "" { + // Check the environment variable + vaultToken = os.Getenv("VAULT_TOKEN") + } + + // Revert to most recent running version! + m := &api.WriteOptions{Namespace: namespace} + resp, _, err := client.Jobs().Revert(jobID, chosenIndex, nil, m, consulToken, vaultToken) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error retrieving job versions: %s, %v", err, chosenIndex)) + statusCh <- 1 + return + } + if *job.Name == "bridge2" { + c.Ui.Output(fmt.Sprintf("HERE")) + + } + + // Nothing to do + evalCreated := resp.EvalID != "" + + if !evalCreated { + statusCh <- 0 + return + } + + if detach { + c.Ui.Output("Evaluation ID: " + resp.EvalID) + statusCh <- 0 + return + } + + mon := newMonitor(c.Ui, client, length) + statusCh <- mon.monitor(resp.EvalID) + + }() + } + // users will still see + // errors if any while we + // wait for the goroutines + // to finish processing + wg.Wait() + + // close the channel to ensure + // the range statement below + // doesn't go on indefinitely + close(statusCh) + + // return a non-zero exit code + // if even a single job start fails + for status := range statusCh { + if status != 0 { + return status + } + } + return 0 +} diff --git a/command/job_start_test.go b/command/job_start_test.go new file mode 100644 index 000000000000..aac3c018728b --- /dev/null +++ b/command/job_start_test.go @@ -0,0 +1,200 @@ +package command + +import ( + "encoding/json" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/posener/complete" + "os" + "path/filepath" + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/command/agent" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/mitchellh/cli" + "github.com/shoenig/test/must" +) + +var _ cli.Command = (*JobStartCommand)(nil) + +func TestJobStartCommand_Fails(t *testing.T) { + ci.Parallel(t) + + srv, _, addr := testServer(t, true, func(c *agent.Config) { + c.DevMode = true + }) + defer srv.Shutdown() + + ui := cli.NewMockUi() + cmd := &JobStartCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + code := cmd.Run([]string{"-bad", "-flag"}) + must.One(t, code) + + out := ui.ErrorWriter.String() + must.StrContains(t, out, "flag provided but not defined: -bad") + + ui.ErrorWriter.Reset() + + // Fails on nonexistent job ID + code = cmd.Run([]string{"-address=" + addr, "non-existent"}) + must.One(t, code) + + out = ui.ErrorWriter.String() + must.StrContains(t, out, "No job(s) with prefix or ID") + + ui.ErrorWriter.Reset() + + // Fails on connection failure + code = cmd.Run([]string{"-address=nope", "n"}) + must.One(t, code) + + out = ui.ErrorWriter.String() + must.StrContains(t, out, "Error querying job prefix") + + // Fails on attempting to start a job that's not been stopped + jobID := uuid.Generate() + jobFilePath := filepath.Join(os.TempDir(), jobID+".nomad") + + t.Cleanup(func() { + _ = os.Remove(jobFilePath) + }) + job := testJob(jobID) + job.TaskGroups[0].Tasks[0].Resources.MemoryMB = pointer.Of(16) + job.TaskGroups[0].Tasks[0].Resources.DiskMB = pointer.Of(32) + job.TaskGroups[0].Tasks[0].Resources.CPU = pointer.Of(10) + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "30s", + } + + jobJSON, err := json.MarshalIndent(job, "", " ") + must.NoError(t, err) + + jobFile := jobFilePath + err = os.WriteFile(jobFile, []byte(jobJSON), 0o644) + must.NoError(t, err) + + runCmd := &JobRunCommand{Meta: Meta{Ui: ui}} + code = runCmd.Run([]string{"-address", addr, "-json", jobFile}) + must.Zero(t, code, + must.Sprintf("job stop stdout: %s", ui.OutputWriter.String()), + must.Sprintf("job stop stderr: %s", ui.ErrorWriter.String()), + ) + + code = cmd.Run([]string{"-address=" + addr, jobID}) + must.One(t, code) + out = ui.ErrorWriter.String() + must.StrContains(t, out, "has not been stopped and has following status:") + +} + +func TestStartCommand_ManyJobs(t *testing.T) { + ci.Parallel(t) + + srv, _, addr := testServer(t, true, func(c *agent.Config) { + c.DevMode = true + }) + defer srv.Shutdown() + + // the number of jobs we want to run + numJobs := 10 + + // create and run a handful of jobs + jobIDs := make([]string, 0, numJobs) + for i := 0; i < numJobs; i++ { + jobID := uuid.Generate() + jobIDs = append(jobIDs, jobID) + } + + jobFilePath := func(jobID string) string { + return filepath.Join(os.TempDir(), jobID+".nomad") + } + + // cleanup job files we will create + t.Cleanup(func() { + for _, jobID := range jobIDs { + _ = os.Remove(jobFilePath(jobID)) + } + }) + + // record cli output + ui := cli.NewMockUi() + + for _, jobID := range jobIDs { + job := testJob(jobID) + job.TaskGroups[0].Tasks[0].Resources.MemoryMB = pointer.Of(16) + job.TaskGroups[0].Tasks[0].Resources.DiskMB = pointer.Of(32) + job.TaskGroups[0].Tasks[0].Resources.CPU = pointer.Of(10) + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "30s", + } + + jobJSON, err := json.MarshalIndent(job, "", " ") + must.NoError(t, err) + + jobFile := jobFilePath(jobID) + err = os.WriteFile(jobFile, []byte(jobJSON), 0o644) + must.NoError(t, err) + + cmd := &JobRunCommand{Meta: Meta{Ui: ui}} + code := cmd.Run([]string{"-address", addr, "-json", jobFile}) + must.Zero(t, code, + must.Sprintf("job stop stdout: %s", ui.OutputWriter.String()), + must.Sprintf("job stop stderr: %s", ui.ErrorWriter.String()), + ) + } + + // helper for stopping a list of jobs + stop := func(args ...string) (stdout string, stderr string, code int) { + cmd := &JobStopCommand{Meta: Meta{Ui: ui}} + code = cmd.Run(args) + return ui.OutputWriter.String(), ui.ErrorWriter.String(), code + } + // helper for starting a list of jobs + start := func(args ...string) (stdout string, stderr string, code int) { + cmd := &JobStartCommand{Meta: Meta{Ui: ui}} + code = cmd.Run(args) + return ui.OutputWriter.String(), ui.ErrorWriter.String(), code + } + + // stop all jobs in one command + args := []string{"-address", addr, "-detach"} + args = append(args, jobIDs...) + stdout, stderr, code := stop(args...) + must.Zero(t, code, + must.Sprintf("job stop stdout: %s", stdout), + must.Sprintf("job stop stderr: %s", stderr), + ) + + // start all jobs again in one command + stdout, stderr, code = start(args...) + must.Zero(t, code, + must.Sprintf("job start stdout: %s", stdout), + must.Sprintf("job start stderr: %s", stderr), + ) +} +func TestStartCommand_AutocompleteArgs(t *testing.T) { + ci.Parallel(t) + + srv, _, url := testServer(t, true, nil) + defer srv.Shutdown() + + ui := cli.NewMockUi() + cmd := &JobStartCommand{Meta: Meta{Ui: ui, flagAddress: url}} + + // Create a fake job + state := srv.Agent.Server().State() + j := mock.Job() + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j)) + + prefix := j.ID[:len(j.ID)-5] + args := complete.Args{Last: prefix} + predictor := cmd.AutocompleteArgs() + + res := predictor.Predict(args) + must.Len(t, 1, res) + must.Eq(t, j.ID, res[0]) +}