Skip to content

Commit

Permalink
结构调整、kcp支持
Browse files Browse the repository at this point in the history
  • Loading branch information
刘河 committed Feb 9, 2019
1 parent 2e8af6f commit 59d789d
Show file tree
Hide file tree
Showing 60 changed files with 11,087 additions and 773 deletions.
21 changes: 17 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ go语言编写,无第三方依赖,各个平台都已经编译在release中
* [与nginx配合](#与nginx配合)
* [关闭http|https代理](#关闭代理)
* [将nps安装到系统](#将nps安装到系统)
* 单隧道模式及介绍
* 单隧道模式及介绍(即将移除)
* [tcp隧道模式](#tcp隧道模式)
* [udp隧道模式](#udp隧道模式)
* [socks5代理模式](#socks5代理模式)
Expand All @@ -62,6 +62,7 @@ go语言编写,无第三方依赖,各个平台都已经编译在release中
* [带宽限制](#带宽限制)
* [负载均衡](#负载均衡)
* [守护进程](#守护进程)
* [KCP协议支持](#KCP协议支持)
* [相关说明](#相关说明)
* [流量统计](#流量统计)
* [热更新支持](#热更新支持)
Expand Down Expand Up @@ -138,12 +139,13 @@ go语言编写,无第三方依赖,各个平台都已经编译在release中
---|---
httpport | web管理端口
password | web界面管理密码
tcpport | 服务端客户端通信端口
bridePort | 服务端客户端通信端口
pemPath | ssl certFile绝对路径
keyPath | ssl keyFile绝对路径
httpsProxyPort | 域名代理https代理监听端口
httpProxyPort | 域名代理http代理监听端口
authip|web api免验证IP地址
bridgeType|客户端与服务端连接方式kcp或tcp

### 详细说明

Expand Down Expand Up @@ -539,12 +541,23 @@ authip | 免验证ip,适用于web api
### 守护进程
本代理支持守护进程,使用示例如下,服务端客户端所有模式通用,支持linux,darwin,windows。
```
./(nps|npc) start|stop|restart|status xxxxxx
./(nps|npc) start|stop|restart|status 若有其他参数可加其他参数
```
```
(nps|npc).exe start|stop|restart|status xxxxxx
(nps|npc).exe start|stop|restart|status 若有其他参数可加其他参数
```

### KCP协议支持
KCP 是一个快速可靠协议,能以比 TCP浪费10%-20%的带宽的代价,换取平均延迟降低 30%-40%,在弱网环境下对性能能有一定的提升。可在app.conf中修改bridgeType为kcp
,设置后本代理将开启udp端口(bridgePort)

注意:当服务端为kcp时,客户端连接时也需要加上参数

```
-type=kcp
```


## 相关说明

### 获取用户真实ip
Expand Down
154 changes: 92 additions & 62 deletions bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,81 +2,113 @@ package bridge

import (
"errors"
"github.com/cnlh/nps/lib"
"github.com/cnlh/nps/lib/conn"
"github.com/cnlh/nps/lib/file"
"github.com/cnlh/nps/lib/kcp"
"github.com/cnlh/nps/lib/lg"
"github.com/cnlh/nps/lib/pool"
"github.com/cnlh/nps/lib/common"
"net"
"strconv"
"sync"
"time"
)

type Client struct {
tunnel *lib.Conn
signal *lib.Conn
linkMap map[int]*lib.Link
tunnel *conn.Conn
signal *conn.Conn
linkMap map[int]*conn.Link
linkStatusMap map[int]bool
stop chan bool
sync.RWMutex
}

func NewClient(t *conn.Conn, s *conn.Conn) *Client {
return &Client{
linkMap: make(map[int]*conn.Link),
stop: make(chan bool),
linkStatusMap: make(map[int]bool),
signal: s,
tunnel: t,
}
}

type Bridge struct {
TunnelPort int //通信隧道端口
listener *net.TCPListener //server端监听
Client map[int]*Client
RunList map[int]interface{} //运行中的任务
lock sync.Mutex
tunnelLock sync.Mutex
clientLock sync.Mutex
TunnelPort int //通信隧道端口
tcpListener *net.TCPListener //server端监听
kcpListener *kcp.Listener //server端监听
Client map[int]*Client
RunList map[int]interface{} //运行中的任务
tunnelType string //bridge type kcp or tcp
lock sync.Mutex
tunnelLock sync.Mutex
clientLock sync.RWMutex
}

func NewTunnel(tunnelPort int, runList map[int]interface{}) *Bridge {
func NewTunnel(tunnelPort int, runList map[int]interface{}, tunnelType string) *Bridge {
t := new(Bridge)
t.TunnelPort = tunnelPort
t.Client = make(map[int]*Client)
t.RunList = runList
t.tunnelType = tunnelType
return t
}

func (s *Bridge) StartTunnel() error {
var err error
s.listener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
if err != nil {
return err
}
go s.tunnelProcess()
return nil
}

//tcp server
func (s *Bridge) tunnelProcess() error {
var err error
for {
conn, err := s.listener.Accept()
if s.tunnelType == "kcp" {
s.kcpListener, err = kcp.ListenWithOptions(":"+strconv.Itoa(s.TunnelPort), nil, 150, 3)
if err != nil {
return err
}
go func() {
for {
c, err := s.kcpListener.AcceptKCP()
conn.SetUdpSession(c)
if err != nil {
lg.Println(err)
continue
}
go s.cliProcess(conn.NewConn(c))
}
}()
} else {
s.tcpListener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
if err != nil {
lib.Println(err)
continue
return err
}
go s.cliProcess(lib.NewConn(conn))
go func() {
for {
c, err := s.tcpListener.Accept()
if err != nil {
lg.Println(err)
continue
}
go s.cliProcess(conn.NewConn(c))
}
}()
}
return err
return nil
}

//验证失败,返回错误验证flag,并且关闭连接
func (s *Bridge) verifyError(c *lib.Conn) {
c.Write([]byte(lib.VERIFY_EER))
func (s *Bridge) verifyError(c *conn.Conn) {
c.Write([]byte(common.VERIFY_EER))
c.Conn.Close()
}

func (s *Bridge) cliProcess(c *lib.Conn) {
c.SetReadDeadline(5)
func (s *Bridge) cliProcess(c *conn.Conn) {
c.SetReadDeadline(5, s.tunnelType)
var buf []byte
var err error
if buf, err = c.ReadLen(32); err != nil {
c.Close()
return
}
//验证
id, err := lib.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
id, err := file.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
if err != nil {
lib.Println("当前客户端连接校验错误,关闭此客户端:", c.Conn.RemoteAddr())
lg.Println("当前客户端连接校验错误,关闭此客户端:", c.Conn.RemoteAddr())
s.verifyError(c)
return
}
Expand All @@ -97,40 +129,39 @@ func (s *Bridge) closeClient(id int) {
}

//tcp连接类型区分
func (s *Bridge) typeDeal(typeVal string, c *lib.Conn, id int) {
func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
switch typeVal {
case lib.WORK_MAIN:
case common.WORK_MAIN:
//客户端已经存在,下线
s.clientLock.Lock()
if _, ok := s.Client[id]; ok {
if v, ok := s.Client[id]; ok {
s.clientLock.Unlock()
s.closeClient(id)
if v.signal != nil {
v.signal.WriteClose()
}
v.Lock()
v.signal = c
v.Unlock()
} else {
s.Client[id] = NewClient(nil, c)
s.clientLock.Unlock()
}
s.clientLock.Lock()

s.Client[id] = &Client{
linkMap: make(map[int]*lib.Link),
stop: make(chan bool),
linkStatusMap: make(map[int]bool),
}
lib.Printf("客户端%d连接成功,地址为:%s", id, c.Conn.RemoteAddr())
s.Client[id].signal = c
s.clientLock.Unlock()
lg.Printf("客户端%d连接成功,地址为:%s", id, c.Conn.RemoteAddr())
go s.GetStatus(id)
case lib.WORK_CHAN:
case common.WORK_CHAN:
s.clientLock.Lock()
if v, ok := s.Client[id]; ok {
s.clientLock.Unlock()
v.Lock()
v.tunnel = c
v.Unlock()
} else {
s.Client[id] = NewClient(c, nil)
s.clientLock.Unlock()
return
}
go s.clientCopy(id)
}
c.SetAlive()
c.SetAlive(s.tunnelType)
return
}

Expand Down Expand Up @@ -161,13 +192,13 @@ func (s *Bridge) waitStatus(clientId, id int) (bool) {
return false
}

func (s *Bridge) SendLinkInfo(clientId int, link *lib.Link) (tunnel *lib.Conn, err error) {
func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link) (tunnel *conn.Conn, err error) {
s.clientLock.Lock()
if v, ok := s.Client[clientId]; ok {
s.clientLock.Unlock()
v.signal.SendLinkInfo(link)
if err != nil {
lib.Println("send error:", err, link.Id)
lg.Println("send error:", err, link.Id)
s.DelClient(clientId)
return
}
Expand All @@ -192,7 +223,7 @@ func (s *Bridge) SendLinkInfo(clientId int, link *lib.Link) (tunnel *lib.Conn, e
}

//得到一个tcp隧道
func (s *Bridge) GetTunnel(id int, en, de int, crypt, mux bool) (conn *lib.Conn, err error) {
func (s *Bridge) GetTunnel(id int, en, de int, crypt, mux bool) (conn *conn.Conn, err error) {
s.clientLock.Lock()
defer s.clientLock.Unlock()
if v, ok := s.Client[id]; !ok {
Expand All @@ -204,7 +235,7 @@ func (s *Bridge) GetTunnel(id int, en, de int, crypt, mux bool) (conn *lib.Conn,
}

//得到一个通信通道
func (s *Bridge) GetSignal(id int) (conn *lib.Conn, err error) {
func (s *Bridge) GetSignal(id int) (conn *conn.Conn, err error) {
s.clientLock.Lock()
defer s.clientLock.Unlock()
if v, ok := s.Client[id]; !ok {
Expand Down Expand Up @@ -257,19 +288,19 @@ func (s *Bridge) clientCopy(clientId int) {
for {
if id, err := client.tunnel.GetLen(); err != nil {
s.closeClient(clientId)
lib.Println("读取msg id 错误", err, id)
lg.Println("读取msg id 错误", err, id)
break
} else {
client.Lock()
if link, ok := client.linkMap[id]; ok {
client.Unlock()
if content, err := client.tunnel.GetMsgContent(link); err != nil {
lib.PutBufPoolCopy(content)
pool.PutBufPoolCopy(content)
s.closeClient(clientId)
lib.Println("read msg content error", err, "close client")
lg.Println("read msg content error", err, "close client")
break
} else {
if len(content) == len(lib.IO_EOF) && string(content) == lib.IO_EOF {
if len(content) == len(common.IO_EOF) && string(content) == common.IO_EOF {
if link.Conn != nil {
link.Conn.Close()
}
Expand All @@ -281,13 +312,12 @@ func (s *Bridge) clientCopy(clientId int) {
}
link.Flow.Add(0, len(content))
}
lib.PutBufPoolCopy(content)
pool.PutBufPoolCopy(content)
}
} else {
client.Unlock()
continue
}
}
}

}
Loading

0 comments on commit 59d789d

Please sign in to comment.