Skip to content

Commit

Permalink
1.完成player功能
Browse files Browse the repository at this point in the history
  • Loading branch information
osgochina committed Jun 30, 2022
1 parent a5ca3c3 commit 285a426
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 40 deletions.
46 changes: 34 additions & 12 deletions cmd/player/tcpServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/gogf/gf/os/gcfg"
"github.com/gogf/gf/os/glog"
"github.com/osgochina/dmicro/drpc"
"github.com/osgochina/dmicro/drpc/status"
"github.com/osgochina/dmicro/easyservice"
"github.com/vprix/vncproxy/rfb"
"github.com/vprix/vncproxy/security"
Expand Down Expand Up @@ -66,18 +67,39 @@ func (that *TcpSandBox) Setup() error {
}
return err
}
svrSession := session.NewServerSession(
rfb.OptSecurityHandlers(securityHandlers...),
rfb.OptGetConn(func(sess rfb.ISession) (io.ReadWriteCloser, error) {
return conn, nil
}),
)
play := vnc.NewPlayer(that.cfg.GetString("rbsFile"), svrSession)
_ = play.Start()
for {
err = <-play.Error()
glog.Warning(err)
}
go func(c net.Conn) {
defer func() {
//捕获错误,并且继续执行
if p := recover(); p != nil {
err = fmt.Errorf("panic:%v\n%s", p, status.PanicStackTrace())
}
}()
svrSession := session.NewServerSession(
rfb.OptSecurityHandlers(securityHandlers...),
rfb.OptGetConn(func(sess rfb.ISession) (io.ReadWriteCloser, error) {
return c, nil
}),
)
play := vnc.NewPlayer(that.cfg.GetString("rbsFile"), svrSession)
err = play.Start()
if err != nil {
glog.Warning(err)
return
}
for {
select {
case err = <-play.Error():
glog.Warning(err)
return
case <-that.closed:
play.Close()
return
case <-play.Wait():
return
}
}
}(conn)

}
}

Expand Down
15 changes: 12 additions & 3 deletions cmd/player/wsServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,19 @@ func (that *WSSandBox) Setup() error {
}),
)
play := vnc.NewPlayer(that.cfg.GetString("rfbFile"), svrSession)
_ = play.Start()
for {
err := <-play.Error()
err := play.Start()
if err != nil {
glog.Warning(err)
return
}
for {
select {
case err = <-play.Error():
glog.Warning(err)
return
case <-play.Wait():
return
}
}
})
h.ServeHTTP(r.Response.Writer, r.Request)
Expand Down
9 changes: 8 additions & 1 deletion cmd/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,14 @@ func (that *RecorderSandBox) Setup() error {
logger.Fatal(err)
}
}()
return nil
for {
select {
case err := <-that.recorder.Error():
logger.Error(err)
case <-that.recorder.Wait():
return nil
}
}
}

func (that *RecorderSandBox) Shutdown() error {
Expand Down
2 changes: 1 addition & 1 deletion session/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (that *PlayerSession) Start() {
that.options.ErrorCh <- err
return
}
that.SetHeight(fbWeight)
that.SetHeight(fbHeight)

var pixelFormat rfb.PixelFormat
err = binary.Read(that.br, binary.BigEndian, &pixelFormat)
Expand Down
63 changes: 42 additions & 21 deletions vnc/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Player struct {
svrSession *session.ServerSession // vnc客户端连接到proxy的会话
playerSession *session.PlayerSession
errorCh chan error
closed chan struct{}
syncOnce sync.Once
}

Expand All @@ -36,20 +37,24 @@ func NewPlayer(filePath string, svrSession *session.ServerSession) *Player {
errorCh: make(chan error, 32),
svrSession: svrSession,
playerSession: playerSession,
closed: make(chan struct{}),
}
}

// Start 启动
func (that *Player) Start() error {

_ = that.svrSession.Init(rfb.OptHandlers([]rfb.IHandler{
err := that.svrSession.Init(rfb.OptHandlers([]rfb.IHandler{
&handler.ServerVersionHandler{},
&handler.ServerSecurityHandler{},
that, // 把链接到vnc服务端的逻辑加入
&handler.ServerClientInitHandler{},
&handler.ServerServerInitHandler{},
&handler.ServerMessageHandler{},
}...))
if err != nil {
return err
}

that.svrSession.Start()
return nil
Expand Down Expand Up @@ -77,8 +82,13 @@ func (that *Player) handleIO() {
return
case err := <-that.svrSession.Options().ErrorCh:
that.errorCh <- err
that.Close()
case err := <-that.playerSession.Options().ErrorCh:
that.errorCh <- err
that.Close()
case <-that.closed:
_ = that.svrSession.Close()
_ = that.playerSession.Close()
case msg := <-that.svrSession.Options().Output:
if logger.IsDebug() {
logger.Debugf("收到vnc客户端发送过来的消息,%s", msg)
Expand All @@ -93,30 +103,41 @@ func (that *Player) handleIO() {
}

func (that *Player) readRbs() {
// 从会话中读取消息类型
var messageType rfb.ServerMessageType
if err := binary.Read(that.playerSession, binary.BigEndian, &messageType); err != nil {
that.errorCh <- err
return
}
msg := &messages.FramebufferUpdate{}
// 读取消息内容
parsedMsg, err := msg.Read(that.playerSession)
if err != nil {
that.errorCh <- err
return
}
that.svrSession.Options().Input <- parsedMsg
var sleep int64
_ = binary.Read(that.playerSession, binary.BigEndian, &sleep)
if sleep > 0 {
time.Sleep(time.Duration(sleep))
for {
select {
case <-that.closed:
return
default:
// 从会话中读取消息类型
var messageType rfb.ServerMessageType
if err := binary.Read(that.playerSession, binary.BigEndian, &messageType); err != nil {
that.playerSession.Options().ErrorCh <- err
return
}
msg := &messages.FramebufferUpdate{}
// 读取消息内容
parsedMsg, err := msg.Read(that.playerSession)
if err != nil {
that.playerSession.Options().ErrorCh <- err
return
}
that.svrSession.Options().Input <- parsedMsg
var sleep int64
_ = binary.Read(that.playerSession, binary.BigEndian, &sleep)
if sleep > 0 {
time.Sleep(time.Duration(sleep))
}
}
}

}

func (that *Player) Wait() <-chan struct{} {
return that.closed
}

func (that *Player) Close() {
_ = that.svrSession.Close()
_ = that.playerSession.Close()
that.closed <- struct{}{}
}

func (that *Player) Error() <-chan error {
Expand Down
10 changes: 8 additions & 2 deletions vnc/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

type Recorder struct {
errorCh chan error
closed chan struct{}
cliSession *session.ClientSession // 链接到vnc服务端的会话
recorderSession *session.RecorderSession
}
Expand All @@ -20,6 +21,7 @@ func NewRecorder(recorderSess *session.RecorderSession, cliSession *session.Clie
recorderSession: recorderSess,
cliSession: cliSession,
errorCh: make(chan error, 32),
closed: make(chan struct{}),
}
return recorder
}
Expand Down Expand Up @@ -86,15 +88,19 @@ func (that *Recorder) Start() error {
return nil
case err = <-that.cliSession.Options().ErrorCh:
that.errorCh <- err
that.Close()
case err = <-that.recorderSession.Options().ErrorCh:
that.errorCh <- err
that.Close()
}
}
}

func (that *Recorder) Wait() <-chan struct{} {
return that.closed
}
func (that *Recorder) Close() {
_ = that.cliSession.Close()
_ = that.recorderSession.Close()
that.closed <- struct{}{}
}

func (that *Recorder) Error() <-chan error {
Expand Down

0 comments on commit 285a426

Please sign in to comment.