Skip to content

Commit e08d09b

Browse files
committed
fix: Adding missing file
Signed-off-by: Vincent Boutour <[email protected]>
1 parent 9a823c2 commit e08d09b

File tree

1 file changed

+133
-0
lines changed

1 file changed

+133
-0
lines changed

pkg/tcpool/tcpool.go

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package tcpool
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net"
8+
"strings"
9+
"sync"
10+
11+
"github.com/ViBiOh/kmux/pkg/output"
12+
)
13+
14+
type Pool struct {
15+
sync.RWMutex
16+
backends []string
17+
current uint64
18+
done chan struct{}
19+
}
20+
21+
func New() *Pool {
22+
return &Pool{
23+
done: make(chan struct{}),
24+
}
25+
}
26+
27+
func (bp *Pool) Done() <-chan struct{} {
28+
return bp.done
29+
}
30+
31+
func (bp *Pool) Add(backend string) {
32+
bp.Lock()
33+
defer bp.Unlock()
34+
35+
bp.backends = append(bp.backends, backend)
36+
}
37+
38+
func (bp *Pool) Remove(toRemove string) {
39+
bp.Lock()
40+
defer bp.Unlock()
41+
42+
backends := bp.backends[:0]
43+
for _, backend := range bp.backends {
44+
if backend == toRemove {
45+
continue
46+
}
47+
48+
backends = append(backends, backend)
49+
}
50+
51+
bp.backends = backends
52+
}
53+
54+
func (bp *Pool) next() string {
55+
bp.Lock()
56+
defer bp.Unlock()
57+
58+
backendsLen := uint64(len(bp.backends))
59+
if backendsLen == 0 {
60+
return ""
61+
}
62+
63+
bp.current = (bp.current + 1) % backendsLen
64+
65+
return bp.backends[bp.current]
66+
}
67+
68+
func (bp *Pool) handle(us net.Conn, server string) {
69+
ds, err := net.Dial("tcp", server)
70+
if err != nil {
71+
us.Close()
72+
output.Err("", "dial %s: %s", server, err)
73+
return
74+
}
75+
76+
go copy(ds, us)
77+
go copy(us, ds)
78+
}
79+
80+
func (bp *Pool) Start(ctx context.Context, localPort uint64) {
81+
listenerConfig := &net.ListenConfig{}
82+
listener, err := listenerConfig.Listen(ctx, "tcp", fmt.Sprintf("127.0.0.1:%d", localPort))
83+
if err != nil {
84+
output.Err("", "listen: %s", err)
85+
return
86+
}
87+
88+
defer close(bp.done)
89+
90+
defer func() {
91+
if closeErr := listener.Close(); closeErr != nil {
92+
output.Err("", "listener close: %s", closeErr)
93+
}
94+
}()
95+
96+
output.Std("", "Listening tcp on %d", localPort)
97+
defer output.Std("", "Listening ended.")
98+
99+
connChan := make(chan net.Conn, 4)
100+
go func() {
101+
defer close(connChan)
102+
103+
for {
104+
conn, err := listener.Accept()
105+
if err != nil {
106+
if strings.HasSuffix(err.Error(), "use of closed network connection") {
107+
return
108+
}
109+
110+
output.Err("", "listener accept: %s", err)
111+
continue
112+
}
113+
114+
connChan <- conn
115+
}
116+
}()
117+
118+
for {
119+
select {
120+
case conn := <-connChan:
121+
go bp.handle(conn, bp.next())
122+
case <-ctx.Done():
123+
return
124+
}
125+
}
126+
}
127+
128+
func copy(wc io.WriteCloser, r io.Reader) {
129+
defer wc.Close()
130+
if _, err := io.Copy(wc, r); err != nil {
131+
output.Err("", "pool copy: %s", err)
132+
}
133+
}

0 commit comments

Comments
 (0)