-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathoutput.go
135 lines (119 loc) · 3.18 KB
/
output.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package wfl
import (
"errors"
"fmt"
"io/ioutil"
"os"
"github.com/dgruber/drmaa2interface"
)
func getJobOutputKubernetes(job drmaa2interface.Job) (string, error) {
ji, err := job.GetJobInfo()
if err != nil {
return "", fmt.Errorf("failed getting job info: %s", err)
}
if ji.ExtensionList != nil {
if output, ok := ji.ExtensionList["output"]; ok {
if len(output) > 0 && output[len(output)-1] == '\n' {
output = output[:len(output)-1]
}
if len(output) > 0 && output[len(output)-1] == '\r' {
output = output[:len(output)-1]
}
return output, nil
}
}
return "", errors.New("no output in jobinfo")
}
func getJobOutputOS(job drmaa2interface.Job) (string, error) {
template, err := job.GetJobTemplate()
if err != nil {
return "", fmt.Errorf("failed getting job template: %s", err)
}
return getOutputFromPath(template.OutputPath)
}
func getJobOutputDocker(job drmaa2interface.Job) (string, error) {
template, err := job.GetJobTemplate()
if err != nil {
return "", fmt.Errorf("failed getting job template: %s", err)
}
return getOutputFromPath(template.OutputPath)
}
func getOutputFromPath(outputPath string) (string, error) {
if !isPathLocalFile(outputPath) {
return "", fmt.Errorf("output path %s is not a local file",
outputPath)
}
return getFileContent(outputPath)
}
func getFileContent(path string) (string, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return "", fmt.Errorf("failed reading file %s: %s",
path, err)
}
// OS process -> the output is in the file
// remove trailing newline cr
if len(data) > 0 && data[len(data)-1] == '\n' {
data = data[:len(data)-1]
}
if len(data) > 0 && data[len(data)-1] == '\r' {
data = data[:len(data)-1]
}
return string(data), nil
}
func isPathLocalFile(path string) bool {
normalFile := path != "/dev/null" &&
path != "/dev/stdout" &&
path != "/dev/stderr" &&
path != ""
if normalFile {
// check if path is a file which exists with os.Stat
fi, err := os.Stat(path)
if err != nil {
return false
}
return !fi.IsDir()
}
return false
}
func getJobOutpuForJob(wflType SessionManagerType, job drmaa2interface.Job) (string, error) {
state := job.GetState()
if state == drmaa2interface.Undetermined {
return "", errors.New("job state is undetermined")
}
err := job.WaitTerminated(drmaa2interface.InfiniteTime)
if err != nil {
return "", fmt.Errorf("failed waiting for job termination: %s", err)
}
// for Kubernetes we need the jobinfo "output" extension
switch wflType {
case KubernetesSessionManager:
{
output, err := getJobOutputKubernetes(job)
if err != nil {
return "", fmt.Errorf("failed getting job info for k8s job %s: %s",
job.GetID(), err)
}
return output, nil
}
case DockerSessionManager:
{
output, err := getJobOutputDocker(job)
if err != nil {
return "", fmt.Errorf("failed getting job info for docker job %s: %s",
job.GetID(), err)
}
return output, nil
}
case DefaultSessionManager:
{
output, err := getJobOutputOS(job)
if err != nil {
return "", fmt.Errorf("failed getting job info for OS job %s: %s",
job.GetID(), err)
}
return output, nil
}
}
return "", errors.New("unsupported workflow type")
}