-
-
Notifications
You must be signed in to change notification settings - Fork 512
/
Copy pathpipeline.go
162 lines (140 loc) · 3.5 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package pty
import (
"encoding/json"
"github.com/0xJacky/Nginx-UI/internal/logger"
"github.com/0xJacky/Nginx-UI/settings"
"github.com/creack/pty"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"os"
"os/exec"
"time"
"unicode/utf8"
)
type Pipeline struct {
Pty *os.File
cmd *exec.Cmd
ws *websocket.Conn
}
type Message struct {
Type MsgType
Data json.RawMessage
}
const bufferSize = 2048
func NewPipeLine(conn *websocket.Conn) (p *Pipeline, err error) {
c := exec.Command(settings.ServerSettings.StartCmd)
ptmx, err := pty.StartWithSize(c, &pty.Winsize{Cols: 90, Rows: 60})
if err != nil {
return nil, errors.Wrap(err, "start pty error")
}
p = &Pipeline{
Pty: ptmx,
cmd: c,
ws: conn,
}
return
}
func (p *Pipeline) ReadWsAndWritePty(errorChan chan error) {
for {
msgType, payload, err := p.ws.ReadMessage()
if err != nil && websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNoStatusReceived,
websocket.CloseNormalClosure) {
errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty unexpected close")
return
}
if msgType != websocket.TextMessage {
errorChan <- errors.Errorf("Error ReadWsAndWritePty Invalid msgType: %v", msgType)
return
}
var msg Message
err = json.Unmarshal(payload, &msg)
if err != nil {
errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty json.Unmarshal")
return
}
switch msg.Type {
case TypeData:
var data string
err = json.Unmarshal(msg.Data, &data)
if err != nil {
errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty json.Unmarshal msg.Data")
return
}
_, err = p.Pty.Write([]byte(data))
if err != nil {
errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty write pty")
return
}
case TypeResize:
var win struct {
Cols uint16
Rows uint16
}
err = json.Unmarshal(msg.Data, &win)
if err != nil {
errorChan <- errors.Wrap(err, "Error ReadSktAndWritePty Invalid resize message")
return
}
err = pty.Setsize(p.Pty, &pty.Winsize{Rows: win.Rows, Cols: win.Cols})
if err != nil {
errorChan <- errors.Wrap(err, "Error ReadSktAndWritePty set pty size")
return
}
case TypePing:
err = p.ws.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
if err != nil {
errorChan <- errors.Wrap(err, "Error ReadSktAndWritePty write pong")
return
}
default:
errorChan <- errors.Errorf("Error ReadWsAndWritePty unknown msg.Type %v", msg.Type)
return
}
}
}
func (p *Pipeline) ReadPtyAndWriteWs(errorChan chan error) {
buf := make([]byte, bufferSize)
for {
n, err := p.Pty.Read(buf)
if err != nil {
errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs read pty")
return
}
processedOutput := validString(string(buf[:n]))
err = p.ws.WriteMessage(websocket.TextMessage, []byte(processedOutput))
if err != nil && websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs websocket write")
return
}
}
}
func (p *Pipeline) Close() {
err := p.Pty.Close()
if err != nil {
logger.Error(err)
}
err = p.cmd.Process.Kill()
if err != nil {
logger.Error(err)
}
_, err = p.cmd.Process.Wait()
if err != nil {
logger.Error(err)
}
}
func validString(s string) string {
if !utf8.ValidString(s) {
v := make([]rune, 0, len(s))
for i, r := range s {
if r == utf8.RuneError {
_, size := utf8.DecodeRuneInString(s[i:])
if size == 1 {
continue
}
}
v = append(v, r)
}
s = string(v)
}
return s
}