-
Notifications
You must be signed in to change notification settings - Fork 32
/
utils.go
180 lines (154 loc) · 4.42 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package tes
import (
"bytes"
"encoding/base64"
"errors"
"fmt"
"time"
"github.com/getlantern/deepcopy"
"github.com/rs/xid"
"google.golang.org/protobuf/encoding/protojson"
)
// Marshaler marshals tasks to indented JSON.
var Marshaler = protojson.MarshalOptions{
Indent: " ",
}
// MarshalToString marshals a task to an indented JSON string.
func MarshalToString(t *Task) (string, error) {
if t == nil {
return "", fmt.Errorf("can't marshal nil task")
}
return Marshaler.Format(t), nil
}
// Base64Encode encodes a task as a base64 encoded string
func Base64Encode(t *Task) (string, error) {
data, err := Marshaler.Marshal(t)
if err != nil {
return "", err
}
str := base64.StdEncoding.EncodeToString(data)
return str, nil
}
// Base64Decode decodes a base64 encoded string into a task
func Base64Decode(raw string) (*Task, error) {
data, err := base64.StdEncoding.DecodeString(raw)
if err != nil {
return nil, fmt.Errorf("decoding task: %v", err)
}
task := &Task{}
buf := bytes.NewBuffer(data)
err = protojson.Unmarshal(buf.Bytes(), task)
if err != nil {
return nil, fmt.Errorf("unmarshaling task: %v", err)
}
return task, nil
}
// ErrNotFound is returned when a task is not found.
var ErrNotFound = errors.New("task not found")
var ErrConcurrentStateChange = errors.New("Concurrent stage change")
// Shorthand for task views
const (
Minimal = View_MINIMAL
Basic = View_BASIC
Full = View_FULL
File = FileType_FILE
Directory = FileType_DIRECTORY
)
// GenerateID generates a task ID string.
// IDs are globally unique and sortable.
func GenerateID() string {
id := xid.New()
return id.String()
}
// InitTask intializes task fields which are commonly set by CreateTask,
// such as Id, CreationTime, State, etc. If the task fails validation,
// an error is returned. See Validate().
// The given task is modified.
//
// If "overwrite" is true, the fields Id, State, and CreationTime
// will always be overwritten, even if already set, otherwise they
// will only be set if they are empty.
func InitTask(task *Task, overwrite bool) error {
if overwrite || task.Id == "" {
task.Id = GenerateID()
}
if overwrite || task.State == Unknown {
task.State = Queued
}
if overwrite || task.CreationTime == "" {
task.CreationTime = time.Now().Format(time.RFC3339Nano)
}
if err := Validate(task); err != nil {
return fmt.Errorf("invalid task message:\n%s", err)
}
return nil
}
// RunnableState returns true if the state is RUNNING or INITIALIZING
func RunnableState(s State) bool {
return s == State_INITIALIZING || s == State_RUNNING
}
// TerminalState returns true if the state is COMPLETE, ERROR, SYSTEM_ERROR, or CANCELED
func TerminalState(s State) bool {
return s == State_COMPLETE || s == State_EXECUTOR_ERROR || s == State_SYSTEM_ERROR ||
s == State_CANCELED
}
// GetBasicView returns the basic view of a task.
func (task *Task) GetBasicView() *Task {
view := &Task{}
deepcopy.Copy(view, task)
// remove contents from inputs
for _, v := range view.Inputs {
v.Content = ""
}
// remove stdout and stderr from Task.Logs.Logs
for _, tl := range view.Logs {
tl.SystemLogs = nil
for _, el := range tl.Logs {
el.Stdout = ""
el.Stderr = ""
}
}
return view
}
// GetMinimalView returns the minimal view of a task.
func (task *Task) GetMinimalView() *Task {
id := task.Id
state := task.State
return &Task{
Id: id,
State: state,
}
}
// GetTaskLog gets the task log entry at the given index "i".
// If the entry doesn't exist, empty logs will be appended up to "i".
func (task *Task) GetTaskLog(i int) *TaskLog {
// Grow slice length if necessary
for j := len(task.Logs); j <= i; j++ {
task.Logs = append(task.Logs, &TaskLog{})
}
return task.Logs[i]
}
// GetExecLog gets the executor log entry at the given index "i".
// If the entry doesn't exist, empty logs will be appended up to "i".
func (task *Task) GetExecLog(attempt int, i int) *ExecutorLog {
tl := task.GetTaskLog(attempt)
// Grow slice length if necessary
for j := len(tl.Logs); j <= i; j++ {
tl.Logs = append(tl.Logs, &ExecutorLog{})
}
return tl.Logs[i]
}
// GetPageSize takes in the page size from a request and returns a new page size
// taking into account the minimum, maximum and default as documented in the TES spec.
func GetPageSize(reqSize int32) int {
// default page size
var pageSize = 256
if reqSize != 0 {
pageSize = int(reqSize)
// max page size
if pageSize > 2048 {
pageSize = 2048
}
}
return pageSize
}