Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rtsp server support rtsp-over-websocket play #341

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion conf/lalserver.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
"auth_enable": false,
"auth_method": 1,
"username": "q191201771",
"password": "pengrl"
"password": "pengrl",
"ws_rtsp_enable": true,
"ws_rtsp_addr": ":5566"
},
"record": {
"enable_flv": false,
Expand Down
2 changes: 1 addition & 1 deletion pkg/base/basic_http_sub_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (session *BasicHttpSubSession) Dispose() error {

func (session *BasicHttpSubSession) WriteHttpResponseHeader(b []byte) {
if session.IsWebSocket {
session.write(UpdateWebSocketHeader(session.WebSocketKey))
session.write(UpdateWebSocketHeader(session.WebSocketKey, ""))
} else {
session.write(b)
}
Expand Down
156 changes: 147 additions & 9 deletions pkg/base/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
package base

import (
"bufio"
"crypto/sha1"
"encoding/base64"
"encoding/binary"
"fmt"
"io"
"math"

"github.com/q191201771/naza/pkg/bele"
Expand Down Expand Up @@ -140,17 +144,151 @@ func MakeWsFrameHeader(wsHeader WsHeader) (buf []byte) {
}
return buf
}
func UpdateWebSocketHeader(secWebSocketKey string) []byte {

func UpdateWebSocketHeader(secWebSocketKey, protocol string) []byte {
firstLine := "HTTP/1.1 101 Switching Protocol\r\n"
sha1Sum := sha1.Sum([]byte(secWebSocketKey + WsMagicStr))
secWebSocketAccept := base64.StdEncoding.EncodeToString(sha1Sum[:])
webSocketResponseHeaderStr := firstLine +
"Server: " + LalHttpflvSubSessionServer + "\r\n" +
"Sec-WebSocket-Accept:" + secWebSocketAccept + "\r\n" +
"Keep-Alive: timeout=15, max=100\r\n" +
"Connection: Upgrade\r\n" +
"Upgrade: websocket\r\n" +
CorsHeaders +
"\r\n"

var webSocketResponseHeaderStr string
if protocol == "" {
webSocketResponseHeaderStr = firstLine +
"Server: " + LalHttpflvSubSessionServer + "\r\n" +
"Sec-WebSocket-Accept:" + secWebSocketAccept + "\r\n" +
"Keep-Alive: timeout=15, max=100\r\n" +
"Connection: Upgrade\r\n" +
"Upgrade: websocket\r\n" +
CorsHeaders +
"\r\n"
} else {
webSocketResponseHeaderStr = firstLine +
"Server: " + LalHttpflvSubSessionServer + "\r\n" +
"Sec-WebSocket-Accept:" + secWebSocketAccept + "\r\n" +
"Keep-Alive: timeout=15, max=100\r\n" +
"Connection: Upgrade\r\n" +
"Upgrade: websocket\r\n" +
CorsHeaders +
"Sec-WebSocket-Protocol:" + protocol + "\r\n" +
"\r\n"
}
return []byte(webSocketResponseHeaderStr)
}

func ReadWsPayload(r *bufio.Reader) ([]byte, error) {
var h WsHeader

buf := make([]byte, 2)
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}

h.Fin = (buf[0] & 0x80) != 0
h.Rsv1 = (buf[0] & 0x40) != 0
h.Rsv2 = (buf[0] & 0x20) != 0
h.Rsv3 = (buf[0] & 0x10) != 0
h.Opcode = buf[0] & 0x0f

if buf[1]&0x80 != 0 {
h.Masked = true
}

length := buf[1] & 0x7f
switch {
case length < 126:
h.PayloadLength = uint64(length)
case length == 126:
buf = make([]byte, 2)
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}

h.PayloadLength = uint64(binary.BigEndian.Uint16(buf))
case length == 127:
buf = make([]byte, 8)
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}

h.PayloadLength = binary.BigEndian.Uint64(buf)

default:
err = fmt.Errorf("header error: the most significant bit must be 0")
return nil, err
}

if h.Masked {
buf = make([]byte, 4)
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}

h.MaskKey = bele.BeUint32(buf)
}

payload := make([]byte, h.PayloadLength)
_, err = io.ReadFull(r, payload)
if err != nil {
return nil, err
}

if h.Masked {
mask := make([]byte, 4)
binary.BigEndian.PutUint32(mask, h.MaskKey)
cipher(payload, mask, 0)
}

return payload, nil
}

func cipher(payload []byte, mask []byte, offset int) {
n := len(payload)
if n < 8 {
for i := 0; i < n; i++ {
payload[i] ^= mask[(offset+i)%4]
}
return
}

// Calculate position in mask due to previously processed bytes number.
mpos := offset % 4
// Count number of bytes will processed one by one from the beginning of payload.
ln := remain[mpos]
// Count number of bytes will processed one by one from the end of payload.
// This is done to process payload by 8 bytes in each iteration of main loop.
rn := (n - ln) % 8

for i := 0; i < ln; i++ {
payload[i] ^= mask[(mpos+i)%4]
}
for i := n - rn; i < n; i++ {
payload[i] ^= mask[(mpos+i)%4]
}

// NOTE: we use here binary.LittleEndian regardless of what is real
// endianness on machine is. To do so, we have to use binary.LittleEndian in
// the masking loop below as well.
var (
m = binary.LittleEndian.Uint32((mask[:]))
m2 = uint64(m)<<32 | uint64(m)
)
// Skip already processed right part.
// Get number of uint64 parts remaining to process.
n = (n - ln - rn) >> 3
for i := 0; i < n; i++ {
var (
j = ln + (i << 3)
chunk = payload[j : j+8]
)
p := binary.LittleEndian.Uint64(chunk)
p = p ^ m2
binary.LittleEndian.PutUint64(chunk, p)
}
}

// remain maps position in masking key [0,4) to number
// of bytes that need to be processed manually inside Cipher().
var remain = [4]int{0, 3, 2, 1}
2 changes: 2 additions & 0 deletions pkg/logic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type RtspConfig struct {
RtspsCertFile string `json:"rtsps_cert_file"`
RtspsKeyFile string `json:"rtsps_key_file"`
OutWaitKeyFrameFlag bool `json:"out_wait_key_frame_flag"`
WsRtspEnable bool `json:"ws_rtsp_enable"`
WsRtspAddr string `json:"ws_rtsp_addr"`
rtsp.ServerAuthConfig
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/logic/server_manager__.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ package logic
import (
"flag"
"fmt"
"github.com/q191201771/naza/pkg/taskpool"
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"sync"
"time"

"github.com/q191201771/naza/pkg/taskpool"

"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/httpflv"
Expand All @@ -45,6 +46,7 @@ type ServerManager struct {
rtspsServer *rtsp.Server
httpApiServer *HttpApiServer
pprofServer *http.Server
wsrtspServer *rtsp.WebsocketServer
exitChan chan struct{}

mutex sync.Mutex
Expand Down Expand Up @@ -139,6 +141,9 @@ Doc: %s
if sm.config.RtspConfig.RtspsEnable {
sm.rtspsServer = rtsp.NewServer(sm.config.RtspConfig.RtspsAddr, sm, sm.config.RtspConfig.ServerAuthConfig)
}
if sm.config.RtspConfig.WsRtspEnable {
sm.wsrtspServer = rtsp.NewWebsocketServer(sm.config.RtspConfig.WsRtspAddr, sm, sm.config.RtspConfig.ServerAuthConfig)
}
if sm.config.HttpApiConfig.Enable {
sm.httpApiServer = NewHttpApiServer(sm.config.HttpApiConfig.Addr, sm)
}
Expand Down Expand Up @@ -268,6 +273,15 @@ func (sm *ServerManager) RunLoop() error {
}
}

if sm.wsrtspServer != nil {
go func() {
err := sm.wsrtspServer.Listen()
if err != nil {
Log.Error(err)
}
}()
}

if sm.httpApiServer != nil {
if err := sm.httpApiServer.Listen(); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/rtsp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *Server) OnDelRtspSubSession(session *SubSession) {
// ---------------------------------------------------------------------------------------------------------------------

func (s *Server) handleTcpConnect(conn net.Conn) {
session := NewServerCommandSession(s, conn, s.auth)
session := NewServerCommandSession(s, conn, s.auth, false, "")
s.observer.OnNewRtspSessionConnect(session)

err := session.RunLoop()
Expand Down
Loading
Loading