Skip to content

Commit

Permalink
API changes for v2, including protocol 12 (livekit#392)
Browse files Browse the repository at this point in the history
* API changes for v2, including protocol 12

* Tracks -> TrackPublications
CreateRoom -> NewRoom

* remove funk

* consolidate cleanup

* fix sidReady panic
  • Loading branch information
davidzhao authored Feb 2, 2024
1 parent 1359bee commit 4dc2d34
Show file tree
Hide file tree
Showing 14 changed files with 317 additions and 209 deletions.
4 changes: 2 additions & 2 deletions callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ParticipantCallback struct {
OnTrackSubscriptionFailed func(sid string, rp *RemoteParticipant)
OnTrackPublished func(publication *RemoteTrackPublication, rp *RemoteParticipant)
OnTrackUnpublished func(publication *RemoteTrackPublication, rp *RemoteParticipant)
OnDataReceived func(data []byte, rp *RemoteParticipant)
OnDataReceived func(data []byte, params DataReceiveParams)
}

func NewParticipantCallback() *ParticipantCallback {
Expand All @@ -49,7 +49,7 @@ func NewParticipantCallback() *ParticipantCallback {
OnTrackSubscriptionFailed: func(sid string, rp *RemoteParticipant) {},
OnTrackPublished: func(publication *RemoteTrackPublication, rp *RemoteParticipant) {},
OnTrackUnpublished: func(publication *RemoteTrackPublication, rp *RemoteParticipant) {},
OnDataReceived: func(data []byte, rp *RemoteParticipant) {},
OnDataReceived: func(data []byte, params DataReceiveParams) {},
}
}

Expand Down
33 changes: 33 additions & 0 deletions data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package lksdk

type dataPublishOptions struct {
Reliable bool
DestinationIdentities []string
Topic string
}

type DataReceiveParams struct {
Sender *RemoteParticipant
SenderIdentity string
Topic string
}

type DataPublishOption func(*dataPublishOptions)

func WithDataPublishTopic(topic string) DataPublishOption {
return func(o *dataPublishOptions) {
o.Topic = topic
}
}

func WithDataPublishReliable(reliable bool) DataPublishOption {
return func(o *dataPublishOptions) {
o.Reliable = reliable
}
}

func WithDataPublishDestination(identities []string) DataPublishOption {
return func(o *dataPublishOptions) {
o.DestinationIdentities = identities
}
}
73 changes: 46 additions & 27 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,22 @@ type RTCEngine struct {

url string
token atomic.String
connParams *ConnectParams
connParams *connectParams

JoinTimeout time.Duration

// callbacks
OnDisconnected func()
OnMediaTrack func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver)
OnParticipantUpdate func([]*livekit.ParticipantInfo)
OnActiveSpeakersChanged func([]*livekit.SpeakerInfo)
OnSpeakersChanged func([]*livekit.SpeakerInfo)
OnDataReceived func(userPacket *livekit.UserPacket)
OnConnectionQuality func([]*livekit.ConnectionQualityInfo)
OnRoomUpdate func(room *livekit.Room)
OnRestarting func()
OnRestarted func(*livekit.JoinResponse)
OnResuming func()
OnResumed func()
OnDisconnected func()
OnMediaTrack func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver)
OnParticipantUpdate func([]*livekit.ParticipantInfo)
OnSpeakersChanged func([]*livekit.SpeakerInfo)
OnDataReceived func(userPacket *livekit.UserPacket)
OnConnectionQuality func([]*livekit.ConnectionQualityInfo)
OnRoomUpdate func(room *livekit.Room)
OnRestarting func()
OnRestarted func(*livekit.JoinResponse)
OnResuming func()
OnResumed func()
}

func NewRTCEngine() *RTCEngine {
Expand Down Expand Up @@ -110,8 +109,8 @@ func NewRTCEngine() *RTCEngine {
return e
}

func (e *RTCEngine) Join(url string, token string, params *ConnectParams) (*livekit.JoinResponse, error) {
res, err := e.client.Join(url, token, params)
func (e *RTCEngine) Join(url string, token string, params *connectParams) (*livekit.JoinResponse, error) {
res, err := e.client.Join(url, token, *params)
if err != nil {
return nil, err
}
Expand All @@ -120,7 +119,8 @@ func (e *RTCEngine) Join(url string, token string, params *ConnectParams) (*live
e.token.Store(token)
e.connParams = params

if err = e.configure(res); err != nil {
err = e.configure(res.IceServers, res.ClientConfiguration, proto.Bool(res.SubscriberPrimary))
if err != nil {
return nil, err
}

Expand Down Expand Up @@ -178,12 +178,27 @@ func (e *RTCEngine) setRTT(rtt uint32) {
}
}

func (e *RTCEngine) configure(res *livekit.JoinResponse) error {
iceServers := FromProtoIceServers(res.IceServers)
configuration := webrtc.Configuration{ICEServers: iceServers}
if res.GetClientConfiguration().GetForceRelay() == livekit.ClientConfigSetting_ENABLED {
func (e *RTCEngine) configure(
iceServers []*livekit.ICEServer,
clientConfig *livekit.ClientConfiguration,
subscriberPrimary *bool) error {
rtcICEServers := FromProtoIceServers(iceServers)
configuration := webrtc.Configuration{ICEServers: rtcICEServers}
if clientConfig != nil &&
clientConfig.GetForceRelay() == livekit.ClientConfigSetting_ENABLED {
configuration.ICETransportPolicy = webrtc.ICETransportPolicyRelay
}

// remove previous transport
if e.publisher != nil {
e.publisher.Close()
e.publisher = nil
}
if e.subscriber != nil {
e.subscriber.Close()
e.subscriber = nil
}

var err error
if e.publisher, err = NewPCTransport(PCTransportParams{
Configuration: configuration,
Expand All @@ -201,7 +216,9 @@ func (e *RTCEngine) configure(res *livekit.JoinResponse) error {
}
logger.Debugw("Using ICE servers", "servers", iceServers)

e.subscriberPrimary = res.SubscriberPrimary
if subscriberPrimary != nil {
e.subscriberPrimary = *subscriberPrimary
}
e.subscriber.OnRemoteDescriptionSettled(e.createPublisherAnswerAndSend)

e.publisher.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
Expand Down Expand Up @@ -235,7 +252,7 @@ func (e *RTCEngine) configure(res *livekit.JoinResponse) error {
})

primaryTransport := e.publisher
if res.SubscriberPrimary {
if e.subscriberPrimary {
primaryTransport = e.subscriber
}
primaryTransport.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
Expand Down Expand Up @@ -411,10 +428,6 @@ func (e *RTCEngine) handleDataPacket(msg webrtc.DataChannelMessage) {
return
}
switch msg := packet.Value.(type) {
case *livekit.DataPacket_Speaker:
if e.OnActiveSpeakersChanged != nil {
e.OnActiveSpeakersChanged(msg.Speaker.Speakers)
}
case *livekit.DataPacket_User:
if e.OnDataReceived != nil {
e.OnDataReceived(msg.User)
Expand Down Expand Up @@ -489,11 +502,17 @@ func (e *RTCEngine) handleDisconnect(fullReconnect bool) {
}

func (e *RTCEngine) resumeConnection() error {
_, err := e.client.Join(e.url, e.token.Load(), &ConnectParams{Reconnect: true})
reconnect, err := e.client.Reconnect(e.url, e.token.Load(), *e.connParams)
if err != nil {
return err
}

if reconnect != nil {
err := e.configure(reconnect.IceServers, reconnect.ClientConfiguration, nil)
if err != nil {
return err
}
}
e.client.Start()

// send offer if publisher enabled
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ require (
github.com/pion/sdp/v3 v3.0.6
github.com/pion/webrtc/v3 v3.2.24
github.com/stretchr/testify v1.8.4
github.com/thoas/go-funk v0.9.3
github.com/twitchtv/twirp v8.1.3+incompatible
go.uber.org/atomic v1.11.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
google.golang.org/protobuf v1.32.0
)

Expand Down Expand Up @@ -66,7 +66,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZ
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand All @@ -168,8 +167,6 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU=
github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
8 changes: 4 additions & 4 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestJoin(t *testing.T) {

subCB := &RoomCallback{
ParticipantCallback: ParticipantCallback{
OnDataReceived: func(data []byte, rp *RemoteParticipant) {
OnDataReceived: func(data []byte, params DataReceiveParams) {
dataLock.Lock()
receivedData = string(data)
dataLock.Unlock()
Expand All @@ -123,7 +123,7 @@ func TestJoin(t *testing.T) {
require.NotNil(t, serverInfo)
require.Equal(t, serverInfo.Edition, livekit.ServerInfo_Standard)

pub.LocalParticipant.PublishData([]byte("test"), livekit.DataPacket_RELIABLE, nil)
pub.LocalParticipant.PublishData([]byte("test"), WithDataPublishReliable(true))
localPub := pubNullTrack(t, pub, audioTrackName)
require.Equal(t, localPub.Name(), audioTrackName)

Expand Down Expand Up @@ -203,7 +203,7 @@ func TestForceTLS(t *testing.T) {
require.NoError(t, err)

// ensure publisher connected
pub.LocalParticipant.PublishData([]byte("test"), livekit.DataPacket_RELIABLE, nil)
pub.LocalParticipant.PublishData([]byte("test"), WithDataPublishReliable(true))

pub.Simulate(SimulateForceTLS)
require.Eventually(t, func() bool { return reconnected.Load() && pub.engine.ensurePublisherConnected(true) == nil }, 15*time.Second, 100*time.Millisecond)
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestSubscribeMutedTrack(t *testing.T) {
var trackReceived atomic.Int32

var pubTrackMuted sync.WaitGroup
require.NoError(t, pub.LocalParticipant.PublishData([]byte("test"), livekit.DataPacket_RELIABLE, nil))
require.NoError(t, pub.LocalParticipant.PublishData([]byte("test"), WithDataPublishReliable(true)))

pubMuteTrack := func(t *testing.T, room *Room, name string, codec webrtc.RTPCodecCapability) *LocalTrackPublication {
pubTrackMuted.Add(1)
Expand Down
40 changes: 22 additions & 18 deletions localparticipant.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (p *LocalParticipant) PublishTrack(track webrtc.TrackLocal, opts *TrackPubl
Height: uint32(opts.VideoHeight),
DisableDtx: opts.DisableDTX,
Stereo: opts.Stereo,
Stream: opts.Stream,
}
if kind == TrackKindVideo {
// single layer
Expand Down Expand Up @@ -258,16 +259,32 @@ func (p *LocalParticipant) closeTracks() {
}
}

func (p *LocalParticipant) PublishDataPacket(userPacket *livekit.UserPacket, kind livekit.DataPacket_Kind) error {
if userPacket == nil {
return ErrInvalidParameter
func (p *LocalParticipant) PublishData(
payload []byte,
opts ...DataPublishOption,
) error {
options := &dataPublishOptions{}
for _, opt := range opts {
opt(options)
}
packet := &livekit.UserPacket{
Payload: payload,
DestinationIdentities: options.DestinationIdentities,
}
if options.Topic != "" {
packet.Topic = proto.String(options.Topic)
}
dataPacket := &livekit.DataPacket{
Kind: kind,
Value: &livekit.DataPacket_User{
User: userPacket,
User: packet,
},
}
if options.Reliable {
dataPacket.Kind = livekit.DataPacket_RELIABLE
} else {
dataPacket.Kind = livekit.DataPacket_LOSSY
}

if err := p.engine.ensurePublisherConnected(true); err != nil {
return err
}
Expand All @@ -280,19 +297,6 @@ func (p *LocalParticipant) PublishDataPacket(userPacket *livekit.UserPacket, kin
return p.engine.GetDataChannel(dataPacket.Kind).Send(encoded)
}

func (p *LocalParticipant) PublishData(
data []byte,
kind livekit.DataPacket_Kind,
destinationSids []string,
) error {
packet := &livekit.UserPacket{
Payload: data,
DestinationSids: destinationSids,
}

return p.PublishDataPacket(packet, kind)
}

func (p *LocalParticipant) UnpublishTrack(sid string) error {
obj, loaded := p.tracks.LoadAndDelete(sid)
if !loaded {
Expand Down
Loading

0 comments on commit 4dc2d34

Please sign in to comment.