Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --exit-if-inactive option to armadactl watch. #259

Merged
merged 11 commits into from
Dec 10, 2019
7 changes: 6 additions & 1 deletion cmd/armadactl/cmd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"reflect"
"time"

Expand All @@ -20,6 +21,7 @@ import (
func init() {
rootCmd.AddCommand(watchCmd)
watchCmd.Flags().Bool("raw", false, "Output raw events")
watchCmd.Flags().Bool("exit-if-inactive", false, "Exit if there are no more active jobs")
}

// watchCmd represents the watch command
Expand All @@ -32,7 +34,7 @@ var watchCmd = &cobra.Command{

jobSetId := args[0]
raw, _ := cmd.Flags().GetBool("raw")

exit_on_inactive, _ := cmd.Flags().GetBool("exit-if-inactive")
log.Infof("Watching job set %s", jobSetId)

apiConnectionDetails := client.ExtractCommandlineArmadaApiConnectionDetails()
Expand All @@ -53,6 +55,9 @@ var watchCmd = &cobra.Command{
summary += fmt.Sprintf(" | event: %s, job id: %s", reflect.TypeOf(e), e.GetJobId())
log.Info(summary)
}
if exit_on_inactive && !state.HasActiveJobs() {
os.Exit(0)
itamarst marked this conversation as resolved.
Show resolved Hide resolved
}
return false
})
})
Expand Down
21 changes: 21 additions & 0 deletions e2e/test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"os/exec"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -42,6 +43,26 @@ func TestCanSubmitJob_ReceivingAllExpectedEvents(t *testing.T) {
})
}

func TestCanSubmitJob_ArmdactlWatchExitOnInactive(t *testing.T) {
skipIfIntegrationEnvNotPresent(t)
connDetails := connectionDetails()
util2.WithConnection(connDetails, func(connection *grpc.ClientConn) {
submitClient := api.NewSubmitClient(connection)
eventsClient := api.NewEventClient(connection)

jobRequest := createJobRequest("personal-anonymous")
createQueue(submitClient, jobRequest, t)

cmd := exec.Command("./bin/armadactl", "--armadaUrl="+connDetails.ArmadaUrl, "watch", "--exit-if-inactive", jobRequest.JobSetId)
err := cmd.Start()
assert.NoError(t, err)

submitJobsAndWatch(t, submitClient, eventsClient, jobRequest)
err = cmd.Wait()
assert.NoError(t, err)
})
}

func TestCanSubmitJob_KubernetesNamespacePermissionsAreRespected(t *testing.T) {
skipIfIntegrationEnvNotPresent(t)

Expand Down
14 changes: 14 additions & 0 deletions internal/client/domain/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type JobInfo struct {

var statesToIncludeInSummary []JobStatus

// States where the job is still active, or might be active in the future:
var activeStates []JobStatus

func init() {
statesToIncludeInSummary = []JobStatus{
Queued,
Expand All @@ -37,6 +40,12 @@ func init() {
Failed,
Cancelled,
}
activeStates = []JobStatus{
Queued,
Leased,
Pending,
Running,
}
}

//WatchContext keeps track of the current state when processing a stream of events
Expand Down Expand Up @@ -115,6 +124,11 @@ func (context *WatchContext) GetNumberOfJobsInStates(states []JobStatus) int {
return numberOfJobs
}

// Return whether there are any jobs in active states:
func (context *WatchContext) HasActiveJobs() bool {
return context.GetNumberOfJobsInStates(activeStates) > 0
}

func updateJobInfo(info *JobInfo, event api.Event) {
switch typed := event.(type) {
case *api.JobSubmittedEvent:
Expand Down
26 changes: 26 additions & 0 deletions internal/client/domain/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,32 @@ func TestWatchContext_GetNumberOfJobsInStates(t *testing.T) {
assert.Equal(t, result, 1)
}

// Only jobs in active states mark the context as having active jobs
func TestWatchContext_HasActiveJobs(t *testing.T) {
watchContext := NewWatchContext()

watchContext.ProcessEvent(&api.JobQueuedEvent{JobId: "1"})
assert.Equal(t, watchContext.HasActiveJobs(), true)

watchContext.ProcessEvent(&api.JobCancelledEvent{JobId: "1"})
assert.Equal(t, watchContext.HasActiveJobs(), false)

watchContext.ProcessEvent(&api.JobPendingEvent{JobId: "1"})
assert.Equal(t, watchContext.HasActiveJobs(), true)

watchContext.ProcessEvent(&api.JobSucceededEvent{JobId: "1"})
assert.Equal(t, watchContext.HasActiveJobs(), false)

watchContext.ProcessEvent(&api.JobLeasedEvent{JobId: "1"})
assert.Equal(t, watchContext.HasActiveJobs(), true)

watchContext.ProcessEvent(&api.JobRunningEvent{JobId: "1"})
assert.Equal(t, watchContext.HasActiveJobs(), true)

watchContext.ProcessEvent(&api.JobFailedEvent{JobId: "1"})
assert.Equal(t, watchContext.HasActiveJobs(), false)
}

func TestWatchContext_GetNumberOfJobsInStates_IsCorrectlyUpdatedOnUpdateToExistingJobState(t *testing.T) {
watchContext := NewWatchContext()

Expand Down
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,5 @@ tests-e2e: build-docker e2e-start-cluster
}
trap tearDown EXIT
echo -e "\nrunning test:"
INTEGRATION_ENABLED=true go test ./e2e/test/... -count=1
INTEGRATION_ENABLED=true PATH=${PATH}:./bin go test -v ./e2e/test/... -count=1