Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

WatchTask for Nomad Task Launcher #3797

Merged
merged 11 commits into from
Sep 7, 2022
3 changes: 3 additions & 0 deletions .changelog/3797.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
plugin/nomad: Implement WatchTask plugin for Nomad task launcher. Store Nomad on-demand runner logs in the job system.
```
100 changes: 99 additions & 1 deletion builtin/nomad/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/hashicorp/waypoint-plugin-sdk/component"
"github.com/hashicorp/waypoint-plugin-sdk/docs"
"github.com/hashicorp/waypoint-plugin-sdk/terminal"
)

// TaskLauncher implements the TaskLauncher plugin interface to support
Expand Down Expand Up @@ -286,9 +287,106 @@ func (p *TaskLauncher) StartTask(
func (p *TaskLauncher) WatchTask(
ctx context.Context,
log hclog.Logger,
ui terminal.UI,
ti *TaskInfo,
) (*component.TaskResult, error) {
return nil, status.Errorf(codes.Unimplemented, "WatchTask not implemented")
// We'll query for the allocation in the namespace of our task launcher
queryOpts := &api.QueryOptions{Namespace: p.config.Namespace}

// Accumulate our result on this
var result component.TaskResult
client, err := getNomadClient()
if err != nil {
return nil, err
}
allocs, _, err := client.Jobs().Allocations(ti.Id, true, queryOpts)
if err != nil {
return nil, err
}

if len(allocs) != 1 {
log.Error("Invalid # of allocs for ODR job.")
return nil, status.Error(codes.Internal, "there should be one allocation in the job")
paladin-devops marked this conversation as resolved.
Show resolved Hide resolved
}
alloc, _, err := client.Allocations().Info(allocs[0].ID, queryOpts)
if err != nil {
log.Error("Failed to get info for alloc "+allocs[0].ID+". Error: %s", err.Error())
paladin-devops marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
tg := alloc.GetTaskGroup()
if len(tg.Tasks) != 1 {
return nil, status.Error(codes.Internal, "there should be one task in the allocation")
}
task := tg.Tasks[0]

// We'll give the ODR 5 minutes to start up
// TODO: Make this configurable
ctx, cancel := context.WithTimeout(ctx, time.Minute*time.Duration(5))
defer cancel()
ticker := time.NewTicker(5 * time.Second)
state := "pending"
for state == "pending" {
select {
case <-ticker.C:
case <-ctx.Done(): // cancelled
return nil, status.Errorf(codes.Aborted, "Context cancelled from timeout waiting for ODR task to start %s", ctx.Err())
paladin-devops marked this conversation as resolved.
Show resolved Hide resolved
}
alloc, _, err := client.Allocations().Info(allocs[0].ID, queryOpts)
if err != nil {
log.Error("Failed to get info for alloc "+allocs[0].ID+". Error: %s", err.Error())
paladin-devops marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
allocTask, ok := alloc.TaskStates[task.Name]
if !ok {
return nil, status.Error(codes.Unknown, "ODR task not in alloc")
}
state = allocTask.State
}

// Only follow the logs if our task is still alive
follow := true
if state == "dead" {
follow = false
}

log.Debug("Getting logs for alloc: " + alloc.Name + ", task: " + task.Name)
paladin-devops marked this conversation as resolved.
Show resolved Hide resolved
ch := make(chan struct{})
logStream, errChan := client.AllocFS().Logs(alloc, follow, task.Name, "stderr", "", 0, ch, queryOpts)
READ_LOGS:
for {
select {
case data := <-logStream:
if data == nil {
// check if task is dead, if it is dead, return
alloc, _, err := client.Allocations().Info(allocs[0].ID, queryOpts)
if err != nil {
log.Error("Failed to get info for alloc "+allocs[0].ID+". Error: %s", err.Error())
paladin-devops marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
allocTask, ok := alloc.TaskStates[task.Name]
if !ok {
return nil, status.Error(codes.Unknown, "ODR task not in alloc")
}
state = allocTask.State
if state != "running" {
// if the task is no longer running, exit
result.ExitCode = 0
return &result, nil
}

break READ_LOGS
}
message := string(data.Data)
log.Info(message)
ui.Output(message)
case err := <-errChan:
log.Error("Error reading logs from alloc: %q", err.Error())
paladin-devops marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
}

result.ExitCode = 0
return &result, nil
}

var _ component.TaskLauncher = (*TaskLauncher)(nil)