Skip to content

Commit

Permalink
Log more information in room callbacks (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
biglittlebigben authored Jan 14, 2025
1 parent 9df7dc8 commit 8224e7f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 16 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564
github.com/livekit/protocol v1.30.1-0.20250108225018-c533a46256c2
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8
github.com/livekit/server-sdk-go/v2 v2.4.2-0.20250108162600-c551c7c10e15
github.com/livekit/server-sdk-go/v2 v2.4.2-0.20250110074101-30b57947c1c4
github.com/livekit/sipgo v0.13.2-0.20241209123643-27500ef99c39
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
github.com/ory/dockertest/v3 v3.10.0
Expand All @@ -29,7 +29,7 @@ require (
github.com/urfave/cli/v2 v2.27.2
github.com/zaf/resample v1.5.0
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
google.golang.org/protobuf v1.36.1
google.golang.org/protobuf v1.36.2
gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302
gopkg.in/yaml.v3 v3.0.1
)
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ github.com/livekit/protocol v1.30.1-0.20250108225018-c533a46256c2 h1:0WsFt4+yn35
github.com/livekit/protocol v1.30.1-0.20250108225018-c533a46256c2/go.mod h1:08wT2rI6GecTCwh9n8OA28Gb7ZQNtIR+hX/LccP1TaY=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.4.2-0.20250108162600-c551c7c10e15 h1:4Nku70OAMcXZwiE0OAyQXwjSSuvhpykeN2I3fm9SQHs=
github.com/livekit/server-sdk-go/v2 v2.4.2-0.20250108162600-c551c7c10e15/go.mod h1:9TSaPKBrbTO38TLJPYtdiKTrI1e0VXWrl/y7Ye9Cp4M=
github.com/livekit/server-sdk-go/v2 v2.4.2-0.20250110074101-30b57947c1c4 h1:/bYM2bv5vjdFOoeZ5e3FFMaIeNzVFsHBzWG+/9dPwJQ=
github.com/livekit/server-sdk-go/v2 v2.4.2-0.20250110074101-30b57947c1c4/go.mod h1:w7GnR750YKsx8yG8kpBoUlO1Y2p/0nPqdGMP4XSoLYQ=
github.com/livekit/sipgo v0.13.2-0.20241209123643-27500ef99c39 h1:Lm1cv4AlKKvprrjxsg7ilnzA3XC6ivxqLGAqTJkBdcM=
github.com/livekit/sipgo v0.13.2-0.20241209123643-27500ef99c39/go.mod h1:nbNi0IsYn4tyY2ab7Rafvifty07miHYvgedPMKWbaI4=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
Expand Down Expand Up @@ -360,8 +360,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU=
google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
46 changes: 36 additions & 10 deletions pkg/sip/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ type ParticipantInfo struct {

type Room struct {
log logger.Logger
roomLog logger.Logger // deferred logger
room *lksdk.Room
mix *mixer.Mixer
out *media.SwitchWriter
outDtmf atomic.Pointer[dtmf.Writer]
p ParticipantInfo
ready atomic.Bool
ready core.Fuse
subscribe atomic.Bool
subscribed core.Fuse
stopped core.Fuse
closed core.Fuse
}

type ParticipantConfig struct {
Expand All @@ -75,6 +77,21 @@ type RoomConfig struct {
func NewRoom(log logger.Logger) *Room {
r := &Room{log: log, out: media.NewSwitchWriter(RoomSampleRate)}
r.mix = mixer.NewMixer(r.out, rtp.DefFrameDur)

roomLog, resolve := log.WithDeferredValues()
r.roomLog = roomLog

go func() {
select {
case <-r.ready.Watch():
resolve("room", r.room.Name(), "roomID", r.room.SID())
case <-r.stopped.Watch():
resolve()
case <-r.closed.Watch():
resolve()
}
}()

return r
}

Expand All @@ -100,7 +117,7 @@ func (r *Room) Room() *lksdk.Room {
}

func (r *Room) participantJoin(rp *lksdk.RemoteParticipant) {
log := r.log.WithValues("participant", rp.Identity())
log := r.roomLog.WithValues("participant", rp.Identity(), "pID", rp.SID())
log.Debugw("participant joined")
switch rp.Kind() {
case lksdk.ParticipantSIP:
Expand All @@ -112,8 +129,13 @@ func (r *Room) participantJoin(rp *lksdk.RemoteParticipant) {
}
}

func (r *Room) participantLeft(rp *lksdk.RemoteParticipant) {
log := r.roomLog.WithValues("participant", rp.Identity(), "pID", rp.SID())
log.Debugw("participant left")
}

func (r *Room) subscribeTo(pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
log := r.log.WithValues("participant", rp.Identity(), "track", pub.SID(), "trackName", pub.Name())
log := r.roomLog.WithValues("participant", rp.Identity(), "pID", rp.SID(), "trackID", pub.SID(), "trackName", pub.Name())
if pub.Kind() != lksdk.TrackKindAudio {
log.Debugw("skipping non-audio track")
return
Expand All @@ -138,25 +160,28 @@ func (r *Room) Connect(conf *config.Config, rconf RoomConfig) error {
}
roomCallback := &lksdk.RoomCallback{
OnParticipantConnected: func(rp *lksdk.RemoteParticipant) {
log := r.log.WithValues("participant", rp.Identity())
log := r.roomLog.WithValues("participant", rp.Identity(), "pID", rp.SID())
if !r.subscribe.Load() {
log.Debugw("skipping participant join event - subscribed flag not set")
return // will subscribe later
}
r.participantJoin(rp)
},
OnParticipantDisconnected: func(rp *lksdk.RemoteParticipant) {
r.participantLeft(rp)
},
ParticipantCallback: lksdk.ParticipantCallback{
OnTrackPublished: func(pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
log := r.log.WithValues("participant", rp.Identity(), "track", pub.SID(), "trackName", pub.Name())
log := r.roomLog.WithValues("participant", rp.Identity(), "pID", rp.SID(), "trackID", pub.SID(), "trackName", pub.Name())
if !r.subscribe.Load() {
log.Debugw("skipping track publish event - subscribed flag not set")
return // will subscribe later
}
r.subscribeTo(pub, rp)
},
OnTrackSubscribed: func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
log := r.log.WithValues("participant", rp.Identity(), "track", track.ID(), "trackName", pub.Name())
if !r.ready.Load() {
log := r.roomLog.WithValues("participant", rp.Identity(), "pID", rp.SID(), "trackID", track.ID(), "trackName", pub.Name())
if !r.ready.IsBroken() {
log.Warnw("ignoring track, room not ready", nil)
return
}
Expand Down Expand Up @@ -236,7 +261,7 @@ func (r *Room) Connect(conf *config.Config, rconf RoomConfig) error {
r.p.ID = r.room.LocalParticipant.SID()
r.p.Identity = r.room.LocalParticipant.Identity()
room.LocalParticipant.SetAttributes(partConf.Attributes)
r.ready.Store(true)
r.ready.Break()
r.subscribe.Store(false) // already false, but keep for visibility

// Not subscribing to any tracks just yet!
Expand Down Expand Up @@ -316,7 +341,8 @@ func (r *Room) CloseWithReason(reason livekit.DisconnectReason) error {
if r == nil {
return nil
}
r.ready.Store(false)

r.closed.Break()
r.subscribe.Store(false)
err := r.CloseOutput()
r.SetDTMFOutput(nil)
Expand Down Expand Up @@ -358,7 +384,7 @@ func (r *Room) NewParticipantTrack(sampleRate int) (media.WriteCloser[media.PCM1
}

func (r *Room) SendData(data lksdk.DataPacket, opts ...lksdk.DataPublishOption) error {
if r == nil || !r.ready.Load() {
if r == nil || !r.ready.IsBroken() || r.closed.IsBroken() {
return nil
}
return r.room.LocalParticipant.PublishDataPacket(data, opts...)
Expand Down

0 comments on commit 8224e7f

Please sign in to comment.