diff --git a/go.mod b/go.mod index 8c33e8a..413bcc9 100644 --- a/go.mod +++ b/go.mod @@ -12,16 +12,16 @@ require ( github.com/jfreymuth/oggvorbis v1.0.5 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 - github.com/livekit/protocol v1.28.2-0.20241128072830-b738aedbd841 + github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244 github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 - github.com/livekit/server-sdk-go/v2 v2.4.0 + github.com/livekit/server-sdk-go/v2 v2.4.1-0.20241211082531-7610e1639c28 github.com/livekit/sipgo v0.13.2-0.20241209123643-27500ef99c39 github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12 github.com/ory/dockertest/v3 v3.10.0 github.com/pion/interceptor v0.1.37 github.com/pion/rtp v1.8.9 github.com/pion/sdp/v3 v3.0.9 - github.com/pion/webrtc/v4 v4.0.4 + github.com/pion/webrtc/v4 v4.0.5 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.5 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 113a315..074b071 100644 --- a/go.sum +++ b/go.sum @@ -122,12 +122,12 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 h1:QA7DqIC/ZSsMj8HC0+zNfMMwssHbA0alZALK68r30LQ= github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98/go.mod h1:WIVFAGzVZ7VMjPC5+nbSfwdFjWcbuLgx97KeNSUDTEo= -github.com/livekit/protocol v1.28.2-0.20241128072830-b738aedbd841 h1:69dSvfL6H6odFhL9q4s+RjDRDdfLY+WUUQ/Lz0av2Bs= -github.com/livekit/protocol v1.28.2-0.20241128072830-b738aedbd841/go.mod h1:mqXSWNHbENjxM0/HG25wZ7wgja/K9fA0PeQxi+MPmWw= +github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244 h1:Eg9HK+5bMCDRKhh5g5g16oyNaMbCqMrJvxFBaBuP7Vo= +github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244/go.mod h1:NDg1btMpKCzr/w6QR5kDuXw/e4Y7yOBE+RUAHsc+Y/M= github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY= github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= -github.com/livekit/server-sdk-go/v2 v2.4.0 h1:ide41hppBf7btHLz/nj6rLIQSkaIOxP5tVSki74ZDhg= -github.com/livekit/server-sdk-go/v2 v2.4.0/go.mod h1:0hzAkh/FegPZmXDp8Ai92ndP/mWVpBxeR5VnR3muQp4= +github.com/livekit/server-sdk-go/v2 v2.4.1-0.20241211082531-7610e1639c28 h1:LadsWjdymTEST6ny/huFg5n4IoS7suvXnSrF+RhzBqo= +github.com/livekit/server-sdk-go/v2 v2.4.1-0.20241211082531-7610e1639c28/go.mod h1:o6KTJ9UmNpurEbJBKWbHTDhBthILVG1kms5rAmKOIIY= github.com/livekit/sipgo v0.13.2-0.20241209123643-27500ef99c39 h1:Lm1cv4AlKKvprrjxsg7ilnzA3XC6ivxqLGAqTJkBdcM= github.com/livekit/sipgo v0.13.2-0.20241209123643-27500ef99c39/go.mod h1:nbNi0IsYn4tyY2ab7Rafvifty07miHYvgedPMKWbaI4= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= @@ -192,8 +192,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1 github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo= github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM= github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA= -github.com/pion/webrtc/v4 v4.0.4 h1:X+gkoBLKDsR6FliKKQ/VXGBjnMR3yOPcyXEPt3z7Ep0= -github.com/pion/webrtc/v4 v4.0.4/go.mod h1:LvP8Np5b/sM0uyJIcUPvJcCvhtjHxJwzh2H2PYzE6cQ= +github.com/pion/webrtc/v4 v4.0.5 h1:8cVPojcv3cQTwVga2vF1rzCNvkiEimnYdCCG7yF317I= +github.com/pion/webrtc/v4 v4.0.5/go.mod h1:LvP8Np5b/sM0uyJIcUPvJcCvhtjHxJwzh2H2PYzE6cQ= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/test/integration/sip_test.go b/test/integration/sip_test.go index e81411e..58973fe 100644 --- a/test/integration/sip_test.go +++ b/test/integration/sip_test.go @@ -132,7 +132,7 @@ func (s *SIPServer) CreateTrunkOut(t testing.TB, trunk *livekit.SIPOutboundTrunk if err != nil { t.Fatal(err) } - t.Log("Trunk (out):", tr.SipTrunkId) + t.Log("New trunk (outbound):", tr.SipTrunkId) return tr.SipTrunkId } @@ -144,7 +144,7 @@ func (s *SIPServer) CreateTrunkIn(t testing.TB, trunk *livekit.SIPInboundTrunkIn if err != nil { t.Fatal(err) } - t.Log("Trunk (in):", tr.SipTrunkId) + t.Log("New trunk (inbound):", tr.SipTrunkId) return tr.SipTrunkId } @@ -181,6 +181,7 @@ func (s *SIPServer) CreateTrunkAndIndividual(t testing.TB, trunk *livekit.SIPInb func (s *SIPServer) CreateDirectDispatch(t testing.TB, room, pin string, meta string, attrs map[string]string) string { ctx := context.Background() dr, err := s.Client.CreateSIPDispatchRule(ctx, &livekit.CreateSIPDispatchRuleRequest{ + Name: room, Metadata: meta, Attributes: attrs, Rule: &livekit.SIPDispatchRule{ @@ -194,7 +195,7 @@ func (s *SIPServer) CreateDirectDispatch(t testing.TB, room, pin string, meta st if err != nil { t.Fatal(err) } - t.Log("Dispatch (direct):", dr.SipDispatchRuleId) + t.Log("New dispatch rule (direct):", dr.SipDispatchRuleId) return dr.SipDispatchRuleId } @@ -357,7 +358,7 @@ func TestSIPJoinOpenRoom(t *testing.T) { Kind: livekit.ParticipantInfo_SIP, Metadata: meta, Attributes: map[string]string{ - "sip.callID": "", // special case + "sip.callID": lktest.AttrTestAny, // special case "sip.callStatus": "active", "sip.trunkPhoneNumber": serverNumber, "sip.phoneNumber": clientNumber, @@ -519,7 +520,7 @@ func TestSIPJoinPinRoom(t *testing.T) { Kind: livekit.ParticipantInfo_SIP, Metadata: meta, Attributes: map[string]string{ - "sip.callID": "", // special case + "sip.callID": lktest.AttrTestAny, // special case "sip.callStatus": "active", "sip.trunkPhoneNumber": serverNumber, "sip.phoneNumber": clientNumber, @@ -592,7 +593,7 @@ func TestSIPJoinPinRoom(t *testing.T) { Kind: livekit.ParticipantInfo_SIP, Metadata: meta, Attributes: map[string]string{ - "sip.callID": "", // special case + "sip.callID": lktest.AttrTestAny, // special case "sip.callStatus": "active", "sip.trunkPhoneNumber": serverNumber, "sip.phoneNumber": clientNumber, @@ -649,7 +650,7 @@ func TestSIPJoinOpenRoomWithPin(t *testing.T) { Kind: livekit.ParticipantInfo_SIP, Metadata: meta, Attributes: map[string]string{ - "sip.callID": "", // special case + "sip.callID": lktest.AttrTestAny, // special case "sip.callStatus": "active", "sip.trunkPhoneNumber": serverNumber, "sip.phoneNumber": clientNumber, @@ -718,7 +719,7 @@ func TestSIPJoinRoomIndividual(t *testing.T) { Kind: livekit.ParticipantInfo_SIP, Metadata: meta, Attributes: map[string]string{ - "sip.callID": "", // special case + "sip.callID": lktest.AttrTestAny, // special case "sip.callStatus": "active", "sip.trunkPhoneNumber": serverNumber, "sip.phoneNumber": clientNumber, @@ -793,7 +794,7 @@ func TestSIPAudio(t *testing.T) { Kind: livekit.ParticipantInfo_SIP, Metadata: meta, Attributes: map[string]string{ - "sip.callID": "", // special case + "sip.callID": lktest.AttrTestAny, // special case "sip.callStatus": "active", "sip.trunkPhoneNumber": serverNumber, "sip.phoneNumber": fmt.Sprintf("+%d", 111111111*(i+1)), @@ -845,7 +846,6 @@ func TestSIPOutbound(t *testing.T) { userName = "test-user" userPass = "test-pass" roomPin = "*1234" - dtmfPin = "ww*12w34ww#" // with added delays meta = `{"test":true}` ) @@ -863,15 +863,16 @@ func TestSIPOutbound(t *testing.T) { headersIn := map[string]string{ "X-LK-From-1": "inbound", } - roomPin, dtmfPin := roomPin, dtmfPin + roomPin := roomPin if withPin { // We cannot set headers because of the PIN. See TestSIPJoinPinRoom for details. delete(headersIn, "X-LK-From-1") } else { - roomPin, dtmfPin = "", "" + roomPin = "" } // Configure Trunk for inbound server. trunkIn := srvIn.CreateTrunkIn(t, &livekit.SIPInboundTrunkInfo{ + Name: "Test In", Numbers: []string{serverNumber}, AuthUsername: userName, AuthPassword: userPass, @@ -890,6 +891,7 @@ func TestSIPOutbound(t *testing.T) { // Configure Trunk for outbound server and make a SIP call. trunkOut := srvOut.CreateTrunkOut(t, &livekit.SIPOutboundTrunkInfo{ + Name: "Test Out", Numbers: []string{clientNumber}, Address: srvIn.Address, Transport: tr, @@ -923,18 +925,12 @@ func TestSIPOutbound(t *testing.T) { // Running sub test here is important, because TestSIPOutbound registers Cleanup funcs. t.Run(fmt.Sprintf("run %d", i+1), func(t *testing.T) { lktest.TestSIPOutbound(t, ctx, lkOut.LiveKit, lkIn.LiveKit, lktest.SIPOutboundTestParams{ - TrunkOut: trunkOut, - NumberOut: clientNumber, - RoomOut: "outbound", - TrunkIn: trunkIn, - RuleIn: ruleIn, - NumberIn: serverNumber, - RoomIn: roomIn, - RoomPin: dtmfPin, - MetaIn: meta, - AttrsIn: expAttrsIn, - AttrsOut: expAttrsOut, - TestDMTF: true, + TrunkOut: trunkOut, + RoomOut: "outbound", + TrunkIn: trunkIn, + RuleIn: ruleIn, + AttrsIn: expAttrsIn, + AttrsOut: expAttrsOut, }) }) } diff --git a/test/lktest/livekit.go b/test/lktest/livekit.go index 8ce3999..869cbc5 100644 --- a/test/lktest/livekit.go +++ b/test/lktest/livekit.go @@ -80,7 +80,7 @@ func (lk *LiveKit) RoomParticipants(t TB, room string) []*livekit.ParticipantInf return resp.Participants } -func (lk *LiveKit) CreateSIPParticipant(t TB, req *livekit.CreateSIPParticipantRequest) { +func (lk *LiveKit) CreateSIPParticipant(t TB, req *livekit.CreateSIPParticipantRequest) *livekit.SIPParticipantInfo { r, err := lk.SIP.CreateSIPParticipant(context.Background(), req) if err != nil { t.Fatal(err) @@ -92,6 +92,7 @@ func (lk *LiveKit) CreateSIPParticipant(t TB, req *livekit.CreateSIPParticipantR Room: req.RoomName, Identity: r.ParticipantIdentity, }) }) + return r } func (lk *LiveKit) Connect(t TB, room, identity string, cb *lksdk.RoomCallback) *lksdk.Room { @@ -141,9 +142,17 @@ func (lk *LiveKit) ConnectWithAudio(t TB, room, identity string, cb *lksdk.RoomC return r } -func (lk *LiveKit) ConnectParticipant(t TB, room, identity string, cb *lksdk.RoomCallback) *Participant { +type RoomParticipantCallback struct { + lksdk.RoomCallback + OnSIPStatus func(p *lksdk.RemoteParticipant, callID string, status string) +} + +func (lk *LiveKit) ConnectParticipant(t TB, room, identity string, cb *RoomParticipantCallback) *Participant { + var origCB lksdk.RoomCallback if cb == nil { - cb = new(lksdk.RoomCallback) + cb = new(RoomParticipantCallback) + } else { + origCB = cb.RoomCallback } p := &Participant{t: t} pr, pw := media.Pipe[media.PCM16Sample](RoomSampleRate) @@ -153,14 +162,17 @@ func (lk *LiveKit) ConnectParticipant(t TB, room, identity string, cb *lksdk.Roo }) p.AudioIn = pr p.mixIn = mixer.NewMixer(pw, rtp.DefFrameDur) - cb.ParticipantCallback.OnTrackPublished = func(pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { + cb.OnTrackPublished = func(pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { if pub.Kind() == lksdk.TrackKindAudio { if err := pub.SetSubscribed(true); err != nil { t.Error("cannot subscribe to the track", pub.SID(), err) } } + if origCB.OnTrackPublished != nil { + origCB.OnTrackPublished(pub, rp) + } } - cb.ParticipantCallback.OnTrackSubscribed = func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { + cb.OnTrackSubscribed = func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { inp := p.mixIn.NewInput() defer inp.Close() @@ -173,14 +185,50 @@ func (lk *LiveKit) ConnectParticipant(t TB, room, identity string, cb *lksdk.Roo h := rtp.NewMediaStreamIn[opus.Sample](odec) _ = rtp.HandleLoop(track, h) } + cb.OnParticipantConnected = func(p *lksdk.RemoteParticipant) { + if origCB.OnParticipantConnected != nil { + origCB.OnParticipantConnected(p) + } + switch p.Kind() { + case lksdk.ParticipantSIP: + if cb.OnSIPStatus != nil { + callID := p.Attributes()[livekit.AttrSIPCallID] + status := p.Attributes()[livekit.AttrSIPCallStatus] + cb.OnSIPStatus(p, callID, status) + } + } + } + cb.OnParticipantDisconnected = func(p *lksdk.RemoteParticipant) { + if origCB.OnParticipantDisconnected != nil { + origCB.OnParticipantDisconnected(p) + } + switch p.Kind() { + case lksdk.ParticipantSIP: + if cb.OnSIPStatus != nil { + callID := p.Attributes()[livekit.AttrSIPCallID] + status := p.Attributes()[livekit.AttrSIPCallStatus] + if status == "" { + status = "disconnect-unk" + } + cb.OnSIPStatus(p, callID, status) + } + } + } cb.OnAttributesChanged = func(changed map[string]string, p lksdk.Participant) { - name := "" - if p != nil { - name = p.Name() + if origCB.OnAttributesChanged != nil { + origCB.OnAttributesChanged(changed, p) + } + switch p.Kind() { + case lksdk.ParticipantSIP: + rp, _ := p.(*lksdk.RemoteParticipant) + if rp != nil && cb.OnSIPStatus != nil { + callID := p.Attributes()[livekit.AttrSIPCallID] + status := p.Attributes()[livekit.AttrSIPCallStatus] + cb.OnSIPStatus(rp, callID, status) + } } - t.Logf("attributes changed: %s: %v", name, changed) } - p.Room = lk.Connect(t, room, identity, cb) + p.Room = lk.Connect(t, room, identity, &cb.RoomCallback) for _, rp := range p.Room.GetRemoteParticipants() { for _, pub := range rp.TrackPublications() { cb.ParticipantCallback.OnTrackPublished(pub.(*lksdk.RemoteTrackPublication), rp) @@ -394,7 +442,7 @@ func compareParticipants(t TB, exp *ParticipantInfo, got *livekit.ParticipantInf return nil } -func (lk *LiveKit) ExpectParticipants(t TB, ctx context.Context, room string, participants []ParticipantInfo) { +func (lk *LiveKit) ExpectParticipants(t TB, ctx context.Context, room string, participants []ParticipantInfo) []*livekit.ParticipantInfo { slices.SortFunc(participants, func(a, b ParticipantInfo) int { return strings.Compare(a.Identity, b.Identity) }) @@ -407,7 +455,7 @@ wait: select { case <-ctx.Done(): require.Len(t, list, len(participants), "timeout waiting for participants") - return + return nil case <-ticker.C: continue wait } @@ -421,13 +469,13 @@ wait: select { case <-ctx.Done(): require.NoError(t, err) - return + return nil case <-ticker.C: continue wait } } } - return // all good + return list // all good } } @@ -463,18 +511,18 @@ func (lk *LiveKit) waitRooms(t TB, ctx context.Context, none bool, filter func(r } } -func (lk *LiveKit) ExpectRoomWithParticipants(t TB, ctx context.Context, room string, participants []ParticipantInfo) { +func (lk *LiveKit) ExpectRoomWithParticipants(t TB, ctx context.Context, room string, participants []ParticipantInfo) []*livekit.ParticipantInfo { filter := func(r *livekit.Room) bool { return r.Name == room } rooms := lk.waitRooms(t, ctx, len(participants) == 0, filter) if len(participants) == 0 && len(rooms) == 0 { - return + return nil } require.Len(t, rooms, 1) require.True(t, filter(rooms[0])) - lk.ExpectParticipants(t, ctx, room, participants) + return lk.ExpectParticipants(t, ctx, room, participants) } func (lk *LiveKit) ExpectRoomPref(t TB, ctx context.Context, pref, number string, none bool) *livekit.Room { @@ -488,7 +536,7 @@ func (lk *LiveKit) ExpectRoomPref(t TB, ctx context.Context, pref, number string return rooms[0] } -func (lk *LiveKit) ExpectRoomPrefWithParticipants(t TB, ctx context.Context, pref, number string, participants []ParticipantInfo) { +func (lk *LiveKit) ExpectRoomPrefWithParticipants(t TB, ctx context.Context, pref, number string, participants []ParticipantInfo) []*livekit.ParticipantInfo { room := lk.ExpectRoomPref(t, ctx, pref, number, len(participants) != 0) - lk.ExpectParticipants(t, ctx, room.Name, participants) + return lk.ExpectParticipants(t, ctx, room.Name, participants) } diff --git a/test/lktest/sip.go b/test/lktest/sip.go index 38770cd..25d5d2a 100644 --- a/test/lktest/sip.go +++ b/test/lktest/sip.go @@ -19,6 +19,8 @@ import ( "maps" "slices" "strings" + "sync" + "sync/atomic" "github.com/stretchr/testify/require" @@ -27,6 +29,8 @@ import ( lksdk "github.com/livekit/server-sdk-go/v2" ) +const AttrTestAny = "" + func checkSIPAttrs(t TB, exp, got map[string]string) (_, _ map[string]string) { exp, got = maps.Clone(exp), maps.Clone(got) @@ -36,7 +40,8 @@ func checkSIPAttrs(t TB, exp, got map[string]string) (_, _ map[string]string) { livekit.AttrSIPPrefix + "callIDFull", livekit.AttrSIPPrefix + "callTag", } { - if _, ok := exp[a]; !ok { + expVal, ok := exp[a] + if !ok { continue } v, ok := got[a] @@ -51,6 +56,9 @@ func checkSIPAttrs(t TB, exp, got map[string]string) (_, _ map[string]string) { case livekit.AttrSIPCallID: require.True(t, strings.HasPrefix(v, guid.SIPCallPrefix)) } + if expVal != "" && expVal != AttrTestAny { + require.Equal(t, expVal, v) + } delete(exp, a) delete(got, a) } @@ -67,105 +75,224 @@ func checkSIPAttrs(t TB, exp, got map[string]string) (_, _ map[string]string) { } type SIPOutboundTestParams struct { - TrunkOut string // trunk ID for outbound call - NumberOut string // number to call fom - RoomOut string // room for outbound call - IdentityOut string - AttrsOut map[string]string // expected attributes for outbound participants - TrunkIn string // trunk ID for inbound call - RuleIn string // rule ID for inbound call - NumberIn string // number to call to - RoomIn string // room for inbound call - RoomPin string // room pin for inbound call - MetaIn string // expected metadata for inbound participants - AttrsIn map[string]string // expected attributes for inbound participants - TestDMTF bool // run DTMF test + TrunkOut string // trunk ID for outbound call + RoomOut string // room for outbound call + AttrsOut map[string]string // expected attributes for outbound participants + TrunkIn string // trunk ID for inbound call + RuleIn string // rule ID for inbound call + AttrsIn map[string]string // expected attributes for inbound participants + NoDMTF bool // do not test DTMF } func TestSIPOutbound(t TB, ctx context.Context, lkOut, lkIn *LiveKit, params SIPOutboundTestParams) { - t.Log("creating sip participant") + t.Log("getting trunk info") + + trsOut, err := lkOut.SIP.GetSIPOutboundTrunksByIDs(ctx, []string{params.TrunkOut}) + require.NoError(t, err) + trOut := trsOut[0] + require.NotNil(t, trOut, "trunk not found") + require.NotEmpty(t, trOut.Numbers, "no trunk numbers for outbound") + numOut := trOut.Numbers[0] + t.Logf("using outbound trunk %q (%s, num: %s)", trOut.Name, trOut.SipTrunkId, numOut) + + trsIn, err := lkIn.SIP.GetSIPInboundTrunksByIDs(ctx, []string{params.TrunkIn}) + require.NoError(t, err) + trIn := trsIn[0] + require.NotNil(t, trIn, "trunk not found") + require.NotEmpty(t, trIn.Numbers, "no trunk numbers for inbound") + numIn := trIn.Numbers[0] + t.Logf("using inbound trunk %q (%s, num: %s)", trIn.Name, trIn.SipTrunkId, numIn) + + rulesIn, err := lkIn.SIP.GetSIPDispatchRulesByIDs(ctx, []string{params.RuleIn}) + require.NoError(t, err) + ruleIn := rulesIn[0] + require.NotNil(t, ruleIn, "rule not found") + require.True(t, len(ruleIn.TrunkIds) == 0 || slices.Contains(ruleIn.TrunkIds, trIn.SipTrunkId), "selected rule doesn't match the trunk") + ruleDir, ok := ruleIn.Rule.Rule.(*livekit.SIPDispatchRule_DispatchRuleDirect) + require.True(t, ok, "unsupported dispatch rule type %T", ruleIn.Rule.Rule) + rule := ruleDir.DispatchRuleDirect + roomIn := rule.RoomName + t.Logf("using dispatch rule %q (%s, room: %s)", ruleIn.Name, ruleIn.SipDispatchRuleId, roomIn) + const ( outIdentity = "siptest_outbound" outName = "Outbound Call" outMeta = `{"test":true, "dir": "out"}` ) var ( - inIdentity = "sip_" + params.NumberOut - inName = "Phone " + params.NumberOut + inIdentity = "sip_" + numOut + inName = "Phone " + numOut ) // Make sure we remove rooms when the test ends. // Some tests may reuse LK server, in which case the participants could stay in rooms for a long time. t.Cleanup(func() { _, _ = lkOut.Rooms.DeleteRoom(context.Background(), &livekit.DeleteRoomRequest{Room: params.RoomOut}) - _, _ = lkIn.Rooms.DeleteRoom(context.Background(), &livekit.DeleteRoomRequest{Room: params.RoomIn}) + _, _ = lkIn.Rooms.DeleteRoom(context.Background(), &livekit.DeleteRoomRequest{Room: roomIn}) }) // Make sure we delete inbound SIP participant. Outbound is deleted automatically by CreateSIPParticipant. t.Cleanup(func() { _, _ = lkIn.Rooms.RemoveParticipant(context.Background(), &livekit.RoomParticipantIdentity{ - Room: params.RoomIn, Identity: inIdentity, + Room: roomIn, Identity: inIdentity, }) }) - // Start the outbound call. It should hit Trunk Provider and initiate an inbound call back to the second server. - lkOut.CreateSIPParticipant(t, &livekit.CreateSIPParticipantRequest{ - SipTrunkId: params.TrunkOut, - SipCallTo: params.NumberIn, - RoomName: params.RoomOut, - ParticipantIdentity: outIdentity, - ParticipantName: outName, - ParticipantMetadata: outMeta, - Dtmf: params.RoomPin, - }) - const ( - nameOut = "testOut" - nameIn = "testIn" + identityTest = "test_probe" ) var ( - dataOut = make(chan lksdk.DataPacket, 20) - dataIn = make(chan lksdk.DataPacket, 20) + dataOut = make(chan lksdk.DataPacket, 20) + dataIn = make(chan lksdk.DataPacket, 20) + callIDOut atomic.Pointer[string] + callIDIn atomic.Pointer[string] + statusOut atomic.Pointer[string] + statusIn atomic.Pointer[string] + connected atomic.Bool ) + defer func() { + if !t.Failed() { + return + } + var idIn, idOut string + if p := callIDIn.Load(); p != nil { + idIn = *p + } + if p := callIDOut.Load(); p != nil { + idOut = *p + } + // Try explaining the test result. + if connected.Load() { + t.Errorf(`SIP connected, but media tests failed. + +Check logs for calls: +@callID:%s (outbound) +@callID:%s (inbound) + +Possible causes: +- Media ports are closed +- SDP negotiation failed +- DTMF failed`, + idOut, idIn, + ) + return + } + if idIn != "" && idOut != "" { + t.Errorf(`SIP participants connected, but participant info check failed. + +Check logs for calls: +@callID:%s (outbound, last state: %q) +@callID:%s (inbound, last state: %q)`, + idOut, statusOut.Load(), + idIn, statusIn.Load(), + ) + } else if idOut != "" { + t.Errorf(`Outbound call connected, but no inbound calls were received. + +Check logs for call: +@callID:%s (outbound, last state: %q) + +And search for dropped call for numbers: +@fromUser:%s (from) +@toUser:%s (to) + +Possible causes: +- Signaling is broken +- Signaling port is closed +- Signaling IP / Contact / Via are incorrect +- Password authentication failed`, + + idOut, statusOut.Load(), + numOut, numIn, + ) + } else { + t.Errorf(`Outbound call did not connect. + +Check logs for call: +@callID:%s (outbound, last state: %q)`, + + idOut, statusOut.Load(), + ) + } + }() // LK participants that will generate/listen for audio. - t.Log("connecting lk participant (outbound)") - pOut := lkOut.ConnectParticipant(t, params.RoomOut, nameOut, &lksdk.RoomCallback{ - ParticipantCallback: lksdk.ParticipantCallback{ - OnDataPacket: func(data lksdk.DataPacket, params lksdk.DataReceiveParams) { - select { - case dataOut <- data: - default: - } + t.Log("connecting test participants") + var ( + pOut *Participant + pIn *Participant + wgPar sync.WaitGroup + ) + wgPar.Add(2) + go func() { + defer wgPar.Done() + pOut = lkOut.ConnectParticipant(t, params.RoomOut, identityTest, &RoomParticipantCallback{ + RoomCallback: lksdk.RoomCallback{ + ParticipantCallback: lksdk.ParticipantCallback{ + OnDataPacket: func(data lksdk.DataPacket, params lksdk.DataReceiveParams) { + select { + case dataOut <- data: + default: + } + }, + }, }, - }, - }) - t.Log("connecting lk participant (inbound)") - pIn := lkIn.ConnectParticipant(t, params.RoomIn, nameIn, &lksdk.RoomCallback{ - ParticipantCallback: lksdk.ParticipantCallback{ - OnDataPacket: func(data lksdk.DataPacket, params lksdk.DataReceiveParams) { - select { - case dataIn <- data: - default: - } + OnSIPStatus: func(p *lksdk.RemoteParticipant, callID string, status string) { + callIDOut.Store(&callID) + statusOut.Store(&status) + t.Logf("sip outbound call %s (%s) status %v", callID, p.Identity(), status) }, - }, + }) + }() + go func() { + defer wgPar.Done() + pIn = lkIn.ConnectParticipant(t, roomIn, identityTest, &RoomParticipantCallback{ + RoomCallback: lksdk.RoomCallback{ + ParticipantCallback: lksdk.ParticipantCallback{ + OnDataPacket: func(data lksdk.DataPacket, params lksdk.DataReceiveParams) { + select { + case dataIn <- data: + default: + } + }, + }, + }, + OnSIPStatus: func(p *lksdk.RemoteParticipant, callID string, status string) { + callIDIn.Store(&callID) + statusIn.Store(&status) + t.Logf("sip inbound call %s (%s) status %v", callID, p.Identity(), status) + }, + }) + }() + wgPar.Wait() + + // Start the outbound call. It should hit Trunk Provider and initiate an inbound call back to the second server. + t.Log("creating sip participant") + r := lkOut.CreateSIPParticipant(t, &livekit.CreateSIPParticipantRequest{ + SipTrunkId: params.TrunkOut, + SipCallTo: numIn, + RoomName: params.RoomOut, + ParticipantIdentity: outIdentity, + ParticipantName: outName, + ParticipantMetadata: outMeta, + Dtmf: rule.Pin, }) + t.Logf("outbound call ID: %s", r.SipCallId) - t.Log("checking rooms (outbound)") + t.Log("waiting for outbound participant to become ready") expAttrsOut := map[string]string{ - "sip.callID": "", // special case - "sip.callTag": "", // special case - "sip.callIDFull": "", // special case + "sip.callID": r.SipCallId, // special case + "sip.callTag": AttrTestAny, // special case + "sip.callIDFull": AttrTestAny, // special case "sip.callStatus": "active", - "sip.trunkPhoneNumber": params.NumberOut, - "sip.phoneNumber": params.NumberIn, + "sip.trunkPhoneNumber": numOut, + "sip.phoneNumber": numIn, "sip.trunkID": params.TrunkOut, } for k, v := range params.AttrsOut { expAttrsOut[k] = v } lkOut.ExpectRoomWithParticipants(t, ctx, params.RoomOut, []ParticipantInfo{ - {Identity: nameOut, Kind: livekit.ParticipantInfo_STANDARD}, + {Identity: identityTest, Kind: livekit.ParticipantInfo_STANDARD}, { Identity: outIdentity, Name: outName, @@ -174,34 +301,35 @@ func TestSIPOutbound(t TB, ctx context.Context, lkOut, lkIn *LiveKit, params SIP Attributes: expAttrsOut, }, }) - t.Log("checking rooms (inbound)") + t.Log("waiting for inbound participant to become ready") expAttrsIn := map[string]string{ - "sip.callID": "", // special case - "sip.callTag": "", // special case - "sip.callIDFull": "", // special case + "sip.callID": AttrTestAny, // special case + "sip.callTag": AttrTestAny, // special case + "sip.callIDFull": AttrTestAny, // special case "sip.callStatus": "active", - "sip.trunkPhoneNumber": params.NumberIn, - "sip.phoneNumber": params.NumberOut, + "sip.trunkPhoneNumber": numIn, + "sip.phoneNumber": numOut, "sip.trunkID": params.TrunkIn, "sip.ruleID": params.RuleIn, } for k, v := range params.AttrsIn { expAttrsIn[k] = v } - lkIn.ExpectRoomWithParticipants(t, ctx, params.RoomIn, []ParticipantInfo{ - {Identity: nameIn, Kind: livekit.ParticipantInfo_STANDARD}, + lkIn.ExpectRoomWithParticipants(t, ctx, roomIn, []ParticipantInfo{ + {Identity: identityTest, Kind: livekit.ParticipantInfo_STANDARD}, { Identity: inIdentity, Name: inName, Kind: livekit.ParticipantInfo_SIP, - Metadata: params.MetaIn, + Metadata: ruleIn.Metadata, Attributes: expAttrsIn, }, }) + connected.Store(true) t.Log("testing audio") CheckAudioForParticipants(t, ctx, pOut, pIn) - if params.TestDMTF { + if !params.NoDMTF { t.Log("testing dtmf") CheckDTMFForParticipants(t, ctx, pOut, pIn, dataOut, dataIn) } diff --git a/test/lktest/testing.go b/test/lktest/testing.go index 9c703fa..28130de 100644 --- a/test/lktest/testing.go +++ b/test/lktest/testing.go @@ -29,6 +29,7 @@ const testing = false // do not allow import of testing package // TB mirrors testing.TB interface without including the actual testing package. type TB interface { + Failed() bool Cleanup(func()) Error(args ...any) Errorf(format string, args ...any) @@ -87,6 +88,7 @@ type skip struct{} type testingImpl struct { mu sync.Mutex + failed bool err error cleanup []func() } @@ -101,6 +103,12 @@ func (t *testingImpl) doCleanup() { } } +func (t *testingImpl) Failed() bool { + t.mu.Lock() + defer t.mu.Unlock() + return t.failed +} + func (t *testingImpl) Cleanup(f func()) { t.mu.Lock() defer t.mu.Unlock() @@ -111,6 +119,7 @@ func (t *testingImpl) setError(err error) { t.mu.Lock() defer t.mu.Unlock() t.err = err + t.failed = true } func (t *testingImpl) Error(args ...any) { @@ -127,6 +136,9 @@ func (t *testingImpl) Errorf(format string, args ...any) { } func (t *testingImpl) FailNow() { + t.mu.Lock() + t.failed = true + t.mu.Unlock() panic(fatal{}) }