Skip to content

Commit 9faf4ac

Browse files
committed
connection pool: ssh can't work when pool_count is set, fix #193
1 parent 4c3fb22 commit 9faf4ac

File tree

3 files changed

+69
-19
lines changed

3 files changed

+69
-19
lines changed

src/models/server/server.go

+1
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ func (p *ProxyServer) getWorkConn() (workConn *conn.Conn, err error) {
384384
err = fmt.Errorf("ProxyName [%s], no work connections available, control is closing", p.Name)
385385
return
386386
}
387+
log.Debug("ProxyName [%s], get work connection from pool", p.Name)
387388
default:
388389
// no work connections available in the poll, send message to frpc to get more
389390
p.ctlMsgChan <- 1

src/utils/conn/conn.go

+65-18
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package conn
1616

1717
import (
1818
"bufio"
19+
"bytes"
1920
"encoding/base64"
2021
"fmt"
2122
"io"
@@ -25,6 +26,8 @@ import (
2526
"strings"
2627
"sync"
2728
"time"
29+
30+
"github.com/fatedier/frp/src/utils/pool"
2831
)
2932

3033
type Listener struct {
@@ -61,11 +64,7 @@ func Listen(bindAddr string, bindPort int64) (l *Listener, err error) {
6164
continue
6265
}
6366

64-
c := &Conn{
65-
TcpConn: conn,
66-
closeFlag: false,
67-
}
68-
c.Reader = bufio.NewReader(c.TcpConn)
67+
c := NewConn(conn)
6968
l.accept <- c
7069
}
7170
}()
@@ -95,20 +94,23 @@ func (l *Listener) Close() error {
9594
type Conn struct {
9695
TcpConn net.Conn
9796
Reader *bufio.Reader
97+
buffer *bytes.Buffer
9898
closeFlag bool
99-
mutex sync.RWMutex
99+
100+
mutex sync.RWMutex
100101
}
101102

102103
func NewConn(conn net.Conn) (c *Conn) {
103-
c = &Conn{}
104-
c.TcpConn = conn
104+
c = &Conn{
105+
TcpConn: conn,
106+
buffer: nil,
107+
closeFlag: false,
108+
}
105109
c.Reader = bufio.NewReader(c.TcpConn)
106-
c.closeFlag = false
107-
return c
110+
return
108111
}
109112

110113
func ConnectServer(addr string) (c *Conn, err error) {
111-
c = &Conn{}
112114
servertAddr, err := net.ResolveTCPAddr("tcp", addr)
113115
if err != nil {
114116
return
@@ -117,9 +119,7 @@ func ConnectServer(addr string) (c *Conn, err error) {
117119
if err != nil {
118120
return
119121
}
120-
c.TcpConn = conn
121-
c.Reader = bufio.NewReader(c.TcpConn)
122-
c.closeFlag = false
122+
c = NewConn(conn)
123123
return c, nil
124124
}
125125

@@ -185,7 +185,23 @@ func (c *Conn) GetLocalAddr() (addr string) {
185185
}
186186

187187
func (c *Conn) Read(p []byte) (n int, err error) {
188-
n, err = c.Reader.Read(p)
188+
c.mutex.RLock()
189+
if c.buffer == nil {
190+
c.mutex.RUnlock()
191+
return c.Reader.Read(p)
192+
}
193+
c.mutex.RUnlock()
194+
195+
n, err = c.buffer.Read(p)
196+
if err == io.EOF {
197+
c.mutex.Lock()
198+
c.buffer = nil
199+
c.mutex.Unlock()
200+
var n2 int
201+
n2, err = c.Reader.Read(p[n:])
202+
203+
n += n2
204+
}
189205
return
190206
}
191207

@@ -212,6 +228,16 @@ func (c *Conn) WriteString(content string) (err error) {
212228
return err
213229
}
214230

231+
func (c *Conn) AppendReaderBuffer(content []byte) {
232+
c.mutex.Lock()
233+
defer c.mutex.Unlock()
234+
235+
if c.buffer == nil {
236+
c.buffer = bytes.NewBuffer(make([]byte, 0, 2048))
237+
}
238+
c.buffer.Write(content)
239+
}
240+
215241
func (c *Conn) SetDeadline(t time.Time) error {
216242
return c.TcpConn.SetDeadline(t)
217243
}
@@ -238,22 +264,36 @@ func (c *Conn) IsClosed() (closeFlag bool) {
238264
}
239265

240266
// when you call this function, you should make sure that
241-
// remote client won't send any bytes to this socket
267+
// no bytes were read before
242268
func (c *Conn) CheckClosed() bool {
243269
c.mutex.RLock()
244270
if c.closeFlag {
271+
c.mutex.RUnlock()
245272
return true
246273
}
247274
c.mutex.RUnlock()
248275

276+
tmp := pool.GetBuf(2048)
277+
defer pool.PutBuf(tmp)
249278
err := c.TcpConn.SetReadDeadline(time.Now().Add(time.Millisecond))
250279
if err != nil {
251280
c.Close()
252281
return true
253282
}
254283

255-
var tmp []byte = make([]byte, 1)
256-
_, err = c.TcpConn.Read(tmp)
284+
n, err := c.TcpConn.Read(tmp)
285+
if err == io.EOF {
286+
return true
287+
}
288+
289+
var tmp2 []byte = make([]byte, 1)
290+
err = c.TcpConn.SetReadDeadline(time.Now().Add(time.Millisecond))
291+
if err != nil {
292+
c.Close()
293+
return true
294+
}
295+
296+
n2, err := c.TcpConn.Read(tmp2)
257297
if err == io.EOF {
258298
return true
259299
}
@@ -263,5 +303,12 @@ func (c *Conn) CheckClosed() bool {
263303
c.Close()
264304
return true
265305
}
306+
307+
if n > 0 {
308+
c.AppendReaderBuffer(tmp[:n])
309+
}
310+
if n2 > 0 {
311+
c.AppendReaderBuffer(tmp2[:n2])
312+
}
266313
return false
267314
}

src/utils/vhost/vhost.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -205,16 +205,18 @@ func (sc *sharedConn) Read(p []byte) (n int, err error) {
205205
sc.Unlock()
206206
return sc.Conn.Read(p)
207207
}
208+
sc.Unlock()
208209
n, err = sc.buff.Read(p)
209210

210211
if err == io.EOF {
212+
sc.Lock()
211213
sc.buff = nil
214+
sc.Unlock()
212215
var n2 int
213216
n2, err = sc.Conn.Read(p[n:])
214217

215218
n += n2
216219
}
217-
sc.Unlock()
218220
return
219221
}
220222

0 commit comments

Comments
 (0)