Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

WatchTask for Nomad Task Launcher #3797

Merged
merged 11 commits into from
Sep 7, 2022
3 changes: 3 additions & 0 deletions .changelog/3797.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
plugin/nomad: Store Nomad on-demand runner logs in the job system
paladin-devops marked this conversation as resolved.
Show resolved Hide resolved
```
85 changes: 84 additions & 1 deletion builtin/nomad/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nomad
import (
"context"
"crypto/rand"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -15,6 +16,7 @@ import (

"github.com/hashicorp/waypoint-plugin-sdk/component"
"github.com/hashicorp/waypoint-plugin-sdk/docs"
"github.com/hashicorp/waypoint-plugin-sdk/terminal"
)

// TaskLauncher implements the TaskLauncher plugin interface to support
Expand Down Expand Up @@ -286,9 +288,90 @@ func (p *TaskLauncher) StartTask(
func (p *TaskLauncher) WatchTask(
ctx context.Context,
log hclog.Logger,
ui terminal.UI,
ti *TaskInfo,
) (*component.TaskResult, error) {
return nil, status.Errorf(codes.Unimplemented, "WatchTask not implemented")
// We'll query for the allocation in the namespace of our task launcher
queryOpts := &api.QueryOptions{Namespace: p.config.Namespace}

// Accumulate our result on this
var result component.TaskResult

if client, err := getNomadClient(); err != nil {
return nil, err
} else {
paladin-devops marked this conversation as resolved.
Show resolved Hide resolved
if allocs, _, err := client.Jobs().Allocations(ti.Id, true, queryOpts); err != nil {
log.Error("Failed to get allocations for ODR job: %s", ti.Id)
return nil, err
} else {
if len(allocs) != 1 {
log.Error("Invalid # of allocs for ODR job.")
return nil, errors.New("there should be one allocation in the job")
}
if alloc, _, err := client.Allocations().Info(allocs[0].ID, queryOpts); err != nil {
log.Error("Failed to get info for alloc "+allocs[0].ID+". Error: %s", err.Error())
return nil, err
} else {
tg := alloc.GetTaskGroup()
if len(tg.Tasks) != 1 {
return nil, errors.New("there should be one task in the allocation")
paladin-devops marked this conversation as resolved.
Show resolved Hide resolved
}
task := tg.Tasks[0]

// We'll give the ODR 5 minutes to start up
// TODO: Make this configurable
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't planned on including this in the PR, but will give it a try!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add in future PR!

ctx, cancel := context.WithTimeout(ctx, time.Minute*time.Duration(5))
defer cancel()
ticker := time.NewTicker(5 * time.Second)
state := "pending"
for state == "pending" {
select {
case <-ticker.C:
case <-ctx.Done(): // cancelled
return nil, status.Errorf(codes.Aborted, "Context cancelled from timeout waiting for ODR task to start %s", ctx.Err())
}
if alloc, _, err := client.Allocations().Info(allocs[0].ID, queryOpts); err != nil {
log.Error("Failed to get info for alloc "+allocs[0].ID+". Error: %s", err.Error())
return nil, err
} else {
allocTask, ok := alloc.TaskStates[task.Name]
if !ok {
return nil, errors.New("ODR task not in alloc")
}
state = allocTask.State
}
}

// Only follow the logs if our task is still alive
follow := true
if state == "dead" {
follow = false
}

log.Debug("Getting logs for alloc: " + alloc.Name + ", task: " + task.Name)
ch := make(chan struct{})
logStream, errChan := client.AllocFS().Logs(alloc, follow, task.Name, "stderr", "", 0, ch, queryOpts)
READ_LOGS:
for {
select {
case data := <-logStream:
if data == nil {
break READ_LOGS
paladin-devops marked this conversation as resolved.
Show resolved Hide resolved
}
message := string(data.Data)
log.Info(message)
ui.Output(message)
case err := <-errChan:
log.Error("Error reading logs from alloc: %q", err.Error())
return nil, err
}
}
}
}
}

result.ExitCode = 0
return &result, nil
}

var _ component.TaskLauncher = (*TaskLauncher)(nil)