Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3300 from hashicorp/static-runner-par
Browse files Browse the repository at this point in the history
runner: static runners accept multiple jobs in parallel
  • Loading branch information
mitchellh authored May 1, 2022
2 parents 63c639f + 70724c9 commit c1dcceb
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changelog/3300.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
```release-note:improvement
runner: runners will now accept and execute multiple jobs concurrently
if multiple jobs are available. On-demand runners continue to execute exactly
one job since they are purpose launched for single job execution.
```
31 changes: 30 additions & 1 deletion internal/cli/runner_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io/ioutil"
"net"
"runtime"
"time"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -53,6 +54,9 @@ type RunnerAgentCommand struct {

// Labels for the runner.
flagLabels map[string]string

// The amount of concurrent jobs that can be running.
flagConcurrency int
}

// This is how long a runner in ODR mode will wait for its job assignment before
Expand All @@ -76,6 +80,12 @@ func (c *RunnerAgentCommand) Run(args []string) int {

plugin.InsideODR = c.flagODR

// Flag defaults
if c.flagConcurrency < 1 {
log.Warn("concurrency flag less than 1 has no effect, using 1")
c.flagConcurrency = 1
}

// Connect to the server
log.Info("sourcing credentials and connecting to the Waypoint server")
conn, err := serverclient.Connect(ctx,
Expand Down Expand Up @@ -227,9 +237,16 @@ func (c *RunnerAgentCommand) Run(args []string) int {
go func() {
defer cancel()

// In non-ODR mode, we accept many jobs in parallel.
if !c.flagODR {
runner.AcceptParallel(ctx, c.flagConcurrency)
return
}

// In ODR mode, we accept a single job.
for {
err := runner.Accept(ctx)
if err == nil && c.flagODR {
if err == nil {
log.Debug("handled our one job in ODR mode, exiting")
return
}
Expand Down Expand Up @@ -344,6 +361,18 @@ func (c *RunnerAgentCommand) Flags() *flag.Sets {
Target: &c.flagLabels,
Usage: "Labels to set for this runner in 'k=v' format. Can be specified multiple times.",
})

f.IntVar(&flag.IntVar{
Name: "concurrency",
Target: &c.flagConcurrency,
Usage: "The number of concurrent jobs that can be running at one time. " +
"This has no effect if `-odr` is set. A value of less than 1 will " +
"default to 1.",

// Most jobs that a non-ODR runner runs are IO bound, so we use
// just a heuristic here of allowing some multiple above the CPUs.
Default: runtime.NumCPU() * 3,
})
})
}

Expand Down
25 changes: 25 additions & 0 deletions internal/runner/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,31 @@ import (

var heartbeatDuration = 5 * time.Second

// AcceptParallel allows up to count jobs to be accepted and executing
// concurrently.
func (r *Runner) AcceptParallel(ctx context.Context, count int) {
// Create a new cancellable context so we can stop all the goroutines
// when one exits. We do this because if one exits, its likely that the
// unrecoverable error exists in all.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Start up all the goroutines
r.logger.Info("accepting jobs concurrently", "count", count)
var wg sync.WaitGroup
wg.Add(count)
for i := 0; i < count; i++ {
go func() {
defer cancel()
defer wg.Done()
r.AcceptMany(ctx)
}()
}

// Wait for them to exit
wg.Wait()
}

// AcceptMany will accept jobs and execute them on after another as they are accepted.
// This is meant to be run in a goroutine and reports its own errors via r's logger.
func (r *Runner) AcceptMany(ctx context.Context) {
Expand Down
78 changes: 78 additions & 0 deletions internal/runner/accept_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,84 @@ func TestRunnerAccept_jobHcl(t *testing.T) {
require.Equal(pb.Job_Config_JOB, job.Config.Source)
}

func TestRunnerAcceptParallel(t *testing.T) {
require := require.New(t)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

// Setup our runner
client := singleprocess.TestServer(t)
runner := TestRunner(t, WithClient(client))
require.NoError(runner.Start(ctx))

// Block our noop jobs so we can inspect their state
noopCh := make(chan struct{})
runner.noopCh = noopCh

// Initialize our app
singleprocess.TestApp(t, client, serverptypes.TestJobNew(t, nil).Application)

// Queue jobs
queueResp, err := client.QueueJob(ctx, &pb.QueueJobRequest{
Job: serverptypes.TestJobNew(t, &pb.Job{
Workspace: &pb.Ref_Workspace{Workspace: "w1"},
}),
})
require.NoError(err)
jobId_1 := queueResp.JobId

queueResp, err = client.QueueJob(ctx, &pb.QueueJobRequest{
Job: serverptypes.TestJobNew(t, &pb.Job{
Workspace: &pb.Ref_Workspace{Workspace: "w2"},
}),
})
require.NoError(err)
jobId_2 := queueResp.JobId

// Accept should complete
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
runner.AcceptParallel(ctx, 2)
}()

// Both jobs should be running at once eventually
require.Eventually(func() bool {
job, err := client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_1})
require.NoError(err)
if job.State != pb.Job_RUNNING {
return false
}

job, err = client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_2})
require.NoError(err)
return job.State == pb.Job_RUNNING
}, 3*time.Second, 10*time.Millisecond)

// Jobs should complete
close(noopCh)
require.Eventually(func() bool {
job, err := client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_1})
require.NoError(err)
if job.State != pb.Job_SUCCESS {
return false
}

job, err = client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_2})
require.NoError(err)
return job.State == pb.Job_SUCCESS
}, 3*time.Second, 10*time.Millisecond)

// Loop should exit
cancel()
select {
case <-time.After(2 * time.Second):
t.Fatal("accept should exit")

default:
}
}

// testGitFixture MUST be called before TestRunner since TestRunner
// changes our working directory.
func testGitFixture(t *testing.T, n string) string {
Expand Down
1 change: 1 addition & 0 deletions website/content/commands/runner-agent.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ not generally recommended.
- `-cookie=<string>` - The cookie value of the server to validate API requests. This is required for runner adoption. If you do not already have a runner token, this must be set.
- `-state-dir=<string>` - Directory to store state between restarts. This is optional. If this is set, then a runner can restart without re-triggering the adoption process.
- `-label=<key=value>` - Labels to set for this runner in 'k=v' format. Can be specified multiple times.
- `-concurrency=<int>` - The number of concurrent jobs that can be running at one time. This has no effect if `-odr` is set. A value of less than 1 will default to 1.

@include "commands/runner-agent_more.mdx"

0 comments on commit c1dcceb

Please sign in to comment.