Skip to content

Commit

Permalink
feat(sfu): add join config to peers (#471)
Browse files Browse the repository at this point in the history
  • Loading branch information
OrlandoCo authored Mar 17, 2021
1 parent 7359f9d commit 54cd1c2
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 67 deletions.
146 changes: 82 additions & 64 deletions pkg/sfu/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ var (
ErrOfferIgnored = errors.New("offered ignored")
)

// JoinConfig allow adding more control to the peers joining a session.
type JoinConfig struct {
// If true the peer will not be allowed to publish tracks to session.
NoPublish bool
// If true the peer will not be allowed to subscribe to other peers in session.
NoSubscribe bool
}

// SessionProvider provides the session to the sfu.Peer{}
// This allows the sfu.SFU{} implementation to be customized / wrapped by another package
type SessionProvider interface {
Expand Down Expand Up @@ -57,8 +65,13 @@ func NewPeer(provider SessionProvider) *Peer {
}

// Join initializes this peer for a given sessionID
func (p *Peer) Join(sid, uid string) error {
if p.publisher != nil {
func (p *Peer) Join(sid, uid string, config ...JoinConfig) error {
var conf JoinConfig
if len(config) > 0 {
conf = config[0]
}

if p.session != nil {
Logger.V(1).Info("peer already exists", "session_id", sid, "peer_id", p.id, "publisher_id", p.publisher.id)
return ErrTransportExists
}
Expand All @@ -74,80 +87,85 @@ func (p *Peer) Join(sid, uid string) error {

p.session, cfg = p.provider.GetSession(sid)

p.subscriber, err = NewSubscriber(uid, cfg)
if err != nil {
return fmt.Errorf("error creating transport: %v", err)
}
p.publisher, err = NewPublisher(p.session, uid, cfg)
if err != nil {
return fmt.Errorf("error creating transport: %v", err)
}

for _, dc := range p.session.datachannels {
if err := p.subscriber.AddDatachannel(p, dc); err != nil {
return fmt.Errorf("error setting subscriber default dc datachannel")
}
}

p.subscriber.OnNegotiationNeeded(func() {
p.Lock()
defer p.Unlock()

if p.remoteAnswerPending {
p.negotiationPending = true
return
}

Logger.V(1).Info("Negotiation needed", "peer_id", p.id)
offer, err := p.subscriber.CreateOffer()
if !conf.NoSubscribe {
p.subscriber, err = NewSubscriber(uid, cfg)
if err != nil {
Logger.Error(err, "CreateOffer error")
return
}

p.remoteAnswerPending = true
if p.OnOffer != nil && !p.closed.get() {
Logger.V(0).Info("Send offer", "peer_id", p.id)
p.OnOffer(&offer)
return fmt.Errorf("error creating transport: %v", err)
}
})

p.subscriber.OnICECandidate(func(c *webrtc.ICECandidate) {
Logger.V(1).Info("On subscriber ice candidate called for peer", "peer_id", p.id)
if c == nil {
return
}

if p.OnIceCandidate != nil && !p.closed.get() {
json := c.ToJSON()
p.OnIceCandidate(&json, subscriber)
}
})
p.subscriber.OnNegotiationNeeded(func() {
p.Lock()
defer p.Unlock()

if p.remoteAnswerPending {
p.negotiationPending = true
return
}

Logger.V(1).Info("Negotiation needed", "peer_id", p.id)
offer, err := p.subscriber.CreateOffer()
if err != nil {
Logger.Error(err, "CreateOffer error")
return
}

p.remoteAnswerPending = true
if p.OnOffer != nil && !p.closed.get() {
Logger.V(0).Info("Send offer", "peer_id", p.id)
p.OnOffer(&offer)
}
})

p.subscriber.OnICECandidate(func(c *webrtc.ICECandidate) {
Logger.V(1).Info("On subscriber ice candidate called for peer", "peer_id", p.id)
if c == nil {
return
}

if p.OnIceCandidate != nil && !p.closed.get() {
json := c.ToJSON()
p.OnIceCandidate(&json, subscriber)
}
})
}

p.publisher.OnICECandidate(func(c *webrtc.ICECandidate) {
Logger.V(1).Info("on publisher ice candidate called for peer", "peer_id", p.id)
if c == nil {
return
if !conf.NoPublish {
p.publisher, err = NewPublisher(p.session, uid, cfg)
if err != nil {
return fmt.Errorf("error creating transport: %v", err)
}

if p.OnIceCandidate != nil && !p.closed.get() {
json := c.ToJSON()
p.OnIceCandidate(&json, publisher)
for _, dc := range p.session.datachannels {
if err := p.subscriber.AddDatachannel(p, dc); err != nil {
return fmt.Errorf("error setting subscriber default dc datachannel")
}
}
})

p.publisher.OnICEConnectionStateChange(func(s webrtc.ICEConnectionState) {
if p.OnICEConnectionStateChange != nil && !p.closed.get() {
p.OnICEConnectionStateChange(s)
}
})
p.publisher.OnICECandidate(func(c *webrtc.ICECandidate) {
Logger.V(1).Info("on publisher ice candidate called for peer", "peer_id", p.id)
if c == nil {
return
}

if p.OnIceCandidate != nil && !p.closed.get() {
json := c.ToJSON()
p.OnIceCandidate(&json, publisher)
}
})

p.publisher.OnICEConnectionStateChange(func(s webrtc.ICEConnectionState) {
if p.OnICEConnectionStateChange != nil && !p.closed.get() {
p.OnICEConnectionStateChange(s)
}
})
}

p.session.AddPeer(p)

Logger.V(0).Info("Peer join session", "peer_id", p.id, "session_id", sid)

p.session.Subscribe(p)

if !conf.NoSubscribe {
p.session.Subscribe(p)
}
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sfu/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *Session) AddDatachannel(owner string, dc *webrtc.DataChannel) {
s.peers[owner].subscriber.channels[label] = dc
peers := make([]*Peer, 0, len(s.peers))
for _, p := range s.peers {
if p.id == owner {
if p.id == owner || p.subscriber == nil {
continue
}
peers = append(peers, p)
Expand Down Expand Up @@ -131,7 +131,7 @@ func (s *Session) Publish(router Router, r Receiver) {

for _, p := range peers {
// Don't sub to self
if router.ID() == p.id {
if router.ID() == p.id || p.subscriber == nil {
continue
}

Expand All @@ -151,7 +151,7 @@ func (s *Session) Subscribe(peer *Peer) {
copy(fdc, s.fanOutDCs)
peers := make([]*Peer, 0, len(s.peers))
for _, p := range s.peers {
if p == peer {
if p == peer || p.publisher == nil {
continue
}
peers = append(peers, p)
Expand Down

0 comments on commit 54cd1c2

Please sign in to comment.