Skip to content

Commit 75c039b

Browse files
committed
feat: customization of WebSocket-related parameters
1 parent 19628e6 commit 75c039b

File tree

5 files changed

+80
-29
lines changed

5 files changed

+80
-29
lines changed

app/pages/Schema/SchemaConfig/List/SpaceStats/index.tsx

-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ const SpaceStats = () => {
127127
}
128128
};
129129
const showTime = updateTime && jobId == null && !loading;
130-
console.log('=====updateTime', updateTime);
131130
return (
132131
<div className={styles.nebulaStats}>
133132
<div className={styles.row}>

app/utils/websocket.ts

+6-3
Original file line numberDiff line numberDiff line change
@@ -225,17 +225,20 @@ export class NgqlRunner {
225225
onError = (e: Event) => {
226226
console.error('=====ngqlSocket error', e);
227227
message.error('WebSocket error, try to reconnect...');
228-
this.onDisconnect();
228+
this.onDisconnect(e);
229229
};
230230

231-
onDisconnect = () => {
232-
console.log('=====onDisconnect');
231+
onDisconnect = (e?: CloseEvent | Event) => {
232+
console.log('=====onDisconnect', e);
233233
this.socket?.removeEventListener('close', this.onDisconnect);
234234
this.socket?.removeEventListener('error', this.onError);
235235

236236
this.clearMessageReceiver();
237237
this.closeSocket();
238238

239+
const { code, reason } = (e as CloseEvent) || {};
240+
reason && message.error(`WebSocket closed unexpectedly, code: ${code}, reason: \`${reason}\`, try to reconnect...`);
241+
239242
// try reconnect
240243
this.socketUrl && setTimeout(this.reConnect, 3000);
241244
};

server/api/studio/etc/studio-api.yaml

+17
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,23 @@ Log:
1212
KeepDays: 7
1313
Debug:
1414
Enable: false
15+
WebSocket:
16+
# The maximum wait time (secend) for the pong message from peer.
17+
# If a peer does not respond to the ping message within this time, websocket connection will be closed.
18+
# default 60s, 0 means no limit
19+
WriteDeadline: 60
20+
# The maximum wait time (secend) for the ping message from peer.
21+
# If a peer does not send a ping message within this time, websocket connection will be closed.
22+
# default 60s, 0 means no limit
23+
ReadDeadline: 60
24+
# The maximum message size allowed from peer.
25+
# If a peer sends a message larger than this, an error will be thrown and websocket connection will be closed.
26+
# default: 64MB (32 * 1024 * 1024), 0 means no limit or system limit
27+
WriteLimit: 33554432
28+
# The maximum message size allowed from peer.
29+
# If a peer sends a message larger than this, websocket connection will be closed.
30+
# default: 8MB (8 * 1024 * 1024), 0 means no limit or system limit
31+
ReadLimit: 8388608
1532
Auth:
1633
TokenName: "studio_token"
1734
AccessSecret: "login_secret"

server/api/studio/internal/config/config.go

+19
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,25 @@ type Config struct {
3232
TasksDir string
3333
} `json:",optional"`
3434

35+
WebSocket struct {
36+
// The maximum wait time (secend) for the pong message from peer.
37+
// If a peer does not respond to the ping message within this time, websocket will close the connection.
38+
// default 60s, 0 means no limit
39+
WriteDeadline int64 `json:",default=60"`
40+
// The maximum wait time (secend) for the ping message from peer.
41+
// If a peer does not send a ping message within this time, websocket will close the connection.
42+
// default 60s, 0 means no limit
43+
ReadDeadline int64 `json:",default=60"`
44+
// The maximum message size allowed from peer.
45+
// If a peer sends a message larger than this, a `websocket: write limit exceeded` error will be returned.
46+
// default: 64MB (32 * 1024 * 1024), 0 means no limit or system limit
47+
WriteLimit int64 `json:",default=33554432"`
48+
// The maximum message size allowed from peer.
49+
// If a peer sends a message larger than this, websocket will close the connection.
50+
// default: 8MB (8 * 1024 * 1024), 0 means no limit or system limit
51+
ReadLimit int64 `json:",default=8388608"`
52+
} `json:",optional"`
53+
3554
DB struct {
3655
LogLevel int `json:",default=4"`
3756
IgnoreRecordNotFoundError bool `json:",default=true"`

server/api/studio/pkg/ws/utils/client.go

+38-25
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,19 @@ package utils
33
import (
44
"bytes"
55
"encoding/json"
6+
"fmt"
67
"sync"
78
"time"
89

910
"github.com/gorilla/websocket"
1011
uuid "github.com/satori/go.uuid"
12+
"github.com/vesoft-inc/nebula-studio/server/api/studio/internal/config"
1113
"github.com/zeromicro/go-zero/core/logx"
1214
)
1315

1416
const (
15-
// Time allowed to write a message to the peer.
16-
writeWait = 10 * time.Second
17-
18-
// Time allowed to read the next pong message from the peer.
19-
pongWait = 60 * time.Second
20-
21-
// Send pings to peer with this period. Must be less than pongWait.
22-
pingPeriod = (pongWait * 9) / 10
23-
24-
// Maximum message size allowed from peer.
25-
// Max ngql length can be executed is 4MB
26-
maxMessageSize = 8 * 1024 * 1024
27-
28-
// send buffer size
29-
bufSize = 512
30-
31-
heartbeatRequest = "1"
32-
17+
bufSize = 512 // send buffer size
18+
heartbeatRequest = "1"
3319
heartbeatResponse = "2"
3420
)
3521

@@ -157,10 +143,15 @@ func (c *Client) Destroy() {
157143
// reads from this goroutine.
158144
func (c *Client) readPump() {
159145
defer c.Destroy()
160-
161-
c.Conn.SetReadLimit(maxMessageSize)
162-
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
163-
c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
146+
wsConfig := config.GetConfig().WebSocket
147+
if wsConfig.ReadLimit > 0 {
148+
c.Conn.SetReadLimit(wsConfig.ReadLimit)
149+
}
150+
if wsConfig.ReadDeadline > 0 {
151+
pongWait := time.Duration(wsConfig.ReadDeadline) * time.Second
152+
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
153+
c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
154+
}
164155
for {
165156
_, message, err := c.Conn.ReadMessage()
166157
if err != nil {
@@ -191,7 +182,15 @@ func (c *Client) readPump() {
191182
// application ensures that there is at most one writer to a connection by
192183
// executing all writes from this goroutine.
193184
func (c *Client) writePump() {
194-
ticker := time.NewTicker(pingPeriod)
185+
wsConfig := config.GetConfig().WebSocket
186+
187+
writeWait := 0 * time.Second
188+
ticker := time.NewTicker(30 * time.Second)
189+
190+
if wsConfig.WriteDeadline > 0 {
191+
writeWait = time.Duration(wsConfig.WriteDeadline) * time.Second
192+
}
193+
195194
defer func() {
196195
ticker.Stop()
197196
c.Destroy()
@@ -200,14 +199,25 @@ func (c *Client) writePump() {
200199
for {
201200
select {
202201
case message, ok := <-c.send:
203-
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
204202
if !ok {
205203
// The hub closed the channel.
206204
logx.Errorf("[WebSocket writePump]: c.send length: %v", len(c.send))
207205
c.Conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
208206
return
209207
}
210208

209+
if writeWait > 0 {
210+
// no timeout limit if writeWait is 0
211+
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
212+
}
213+
214+
msgLen := len(message)
215+
if wsConfig.WriteLimit > 0 && int64(msgLen) > wsConfig.WriteLimit {
216+
errMsg := websocket.FormatCloseMessage(websocket.CloseMessageTooBig, fmt.Sprintf("message length (%d) exceeds config limit (%d)", msgLen, wsConfig.WriteLimit))
217+
c.Conn.WriteControl(websocket.CloseMessage, errMsg, time.Now().Add(time.Second))
218+
return
219+
}
220+
211221
w, err := c.Conn.NextWriter(websocket.TextMessage)
212222
if err != nil {
213223
logx.Errorf("[WebSocket WriteMessage]: %v", err)
@@ -220,7 +230,10 @@ func (c *Client) writePump() {
220230
return
221231
}
222232
case <-ticker.C:
223-
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
233+
if writeWait > 0 {
234+
// no timeout limit if writeWait is 0
235+
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
236+
}
224237
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
225238
logx.Errorf("[WebSocket ticker error]: %v", err)
226239
return

0 commit comments

Comments
 (0)