Skip to content

Commit

Permalink
Better error reporting for E2E test.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Dec 12, 2024
1 parent c6a3cff commit 119e697
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 122 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ require (
github.com/jfreymuth/oggvorbis v1.0.5
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98
github.com/livekit/protocol v1.28.2-0.20241128072830-b738aedbd841
github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8
github.com/livekit/server-sdk-go/v2 v2.4.0
github.com/livekit/server-sdk-go/v2 v2.4.1-0.20241211082531-7610e1639c28
github.com/livekit/sipgo v0.13.2-0.20241209123643-27500ef99c39
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
github.com/ory/dockertest/v3 v3.10.0
github.com/pion/interceptor v0.1.37
github.com/pion/rtp v1.8.9
github.com/pion/sdp/v3 v3.0.9
github.com/pion/webrtc/v4 v4.0.4
github.com/pion/webrtc/v4 v4.0.5
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.5
github.com/sirupsen/logrus v1.9.3
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 h1:QA7DqIC/ZSsMj8HC0+zNfMMwssHbA0alZALK68r30LQ=
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98/go.mod h1:WIVFAGzVZ7VMjPC5+nbSfwdFjWcbuLgx97KeNSUDTEo=
github.com/livekit/protocol v1.28.2-0.20241128072830-b738aedbd841 h1:69dSvfL6H6odFhL9q4s+RjDRDdfLY+WUUQ/Lz0av2Bs=
github.com/livekit/protocol v1.28.2-0.20241128072830-b738aedbd841/go.mod h1:mqXSWNHbENjxM0/HG25wZ7wgja/K9fA0PeQxi+MPmWw=
github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244 h1:Eg9HK+5bMCDRKhh5g5g16oyNaMbCqMrJvxFBaBuP7Vo=
github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244/go.mod h1:NDg1btMpKCzr/w6QR5kDuXw/e4Y7yOBE+RUAHsc+Y/M=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.4.0 h1:ide41hppBf7btHLz/nj6rLIQSkaIOxP5tVSki74ZDhg=
github.com/livekit/server-sdk-go/v2 v2.4.0/go.mod h1:0hzAkh/FegPZmXDp8Ai92ndP/mWVpBxeR5VnR3muQp4=
github.com/livekit/server-sdk-go/v2 v2.4.1-0.20241211082531-7610e1639c28 h1:LadsWjdymTEST6ny/huFg5n4IoS7suvXnSrF+RhzBqo=
github.com/livekit/server-sdk-go/v2 v2.4.1-0.20241211082531-7610e1639c28/go.mod h1:o6KTJ9UmNpurEbJBKWbHTDhBthILVG1kms5rAmKOIIY=
github.com/livekit/sipgo v0.13.2-0.20241209123643-27500ef99c39 h1:Lm1cv4AlKKvprrjxsg7ilnzA3XC6ivxqLGAqTJkBdcM=
github.com/livekit/sipgo v0.13.2-0.20241209123643-27500ef99c39/go.mod h1:nbNi0IsYn4tyY2ab7Rafvifty07miHYvgedPMKWbaI4=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
Expand Down Expand Up @@ -192,8 +192,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM=
github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA=
github.com/pion/webrtc/v4 v4.0.4 h1:X+gkoBLKDsR6FliKKQ/VXGBjnMR3yOPcyXEPt3z7Ep0=
github.com/pion/webrtc/v4 v4.0.4/go.mod h1:LvP8Np5b/sM0uyJIcUPvJcCvhtjHxJwzh2H2PYzE6cQ=
github.com/pion/webrtc/v4 v4.0.5 h1:8cVPojcv3cQTwVga2vF1rzCNvkiEimnYdCCG7yF317I=
github.com/pion/webrtc/v4 v4.0.5/go.mod h1:LvP8Np5b/sM0uyJIcUPvJcCvhtjHxJwzh2H2PYzE6cQ=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
44 changes: 20 additions & 24 deletions test/integration/sip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *SIPServer) CreateTrunkOut(t testing.TB, trunk *livekit.SIPOutboundTrunk
if err != nil {
t.Fatal(err)
}
t.Log("Trunk (out):", tr.SipTrunkId)
t.Log("New trunk (outbound):", tr.SipTrunkId)
return tr.SipTrunkId
}

Expand All @@ -144,7 +144,7 @@ func (s *SIPServer) CreateTrunkIn(t testing.TB, trunk *livekit.SIPInboundTrunkIn
if err != nil {
t.Fatal(err)
}
t.Log("Trunk (in):", tr.SipTrunkId)
t.Log("New trunk (inbound):", tr.SipTrunkId)
return tr.SipTrunkId
}

Expand Down Expand Up @@ -181,6 +181,7 @@ func (s *SIPServer) CreateTrunkAndIndividual(t testing.TB, trunk *livekit.SIPInb
func (s *SIPServer) CreateDirectDispatch(t testing.TB, room, pin string, meta string, attrs map[string]string) string {
ctx := context.Background()
dr, err := s.Client.CreateSIPDispatchRule(ctx, &livekit.CreateSIPDispatchRuleRequest{
Name: room,
Metadata: meta,
Attributes: attrs,
Rule: &livekit.SIPDispatchRule{
Expand All @@ -194,7 +195,7 @@ func (s *SIPServer) CreateDirectDispatch(t testing.TB, room, pin string, meta st
if err != nil {
t.Fatal(err)
}
t.Log("Dispatch (direct):", dr.SipDispatchRuleId)
t.Log("New dispatch rule (direct):", dr.SipDispatchRuleId)
return dr.SipDispatchRuleId
}

Expand Down Expand Up @@ -357,7 +358,7 @@ func TestSIPJoinOpenRoom(t *testing.T) {
Kind: livekit.ParticipantInfo_SIP,
Metadata: meta,
Attributes: map[string]string{
"sip.callID": "<test>", // special case
"sip.callID": lktest.AttrTestAny, // special case
"sip.callStatus": "active",
"sip.trunkPhoneNumber": serverNumber,
"sip.phoneNumber": clientNumber,
Expand Down Expand Up @@ -519,7 +520,7 @@ func TestSIPJoinPinRoom(t *testing.T) {
Kind: livekit.ParticipantInfo_SIP,
Metadata: meta,
Attributes: map[string]string{
"sip.callID": "<test>", // special case
"sip.callID": lktest.AttrTestAny, // special case
"sip.callStatus": "active",
"sip.trunkPhoneNumber": serverNumber,
"sip.phoneNumber": clientNumber,
Expand Down Expand Up @@ -592,7 +593,7 @@ func TestSIPJoinPinRoom(t *testing.T) {
Kind: livekit.ParticipantInfo_SIP,
Metadata: meta,
Attributes: map[string]string{
"sip.callID": "<test>", // special case
"sip.callID": lktest.AttrTestAny, // special case
"sip.callStatus": "active",
"sip.trunkPhoneNumber": serverNumber,
"sip.phoneNumber": clientNumber,
Expand Down Expand Up @@ -649,7 +650,7 @@ func TestSIPJoinOpenRoomWithPin(t *testing.T) {
Kind: livekit.ParticipantInfo_SIP,
Metadata: meta,
Attributes: map[string]string{
"sip.callID": "<test>", // special case
"sip.callID": lktest.AttrTestAny, // special case
"sip.callStatus": "active",
"sip.trunkPhoneNumber": serverNumber,
"sip.phoneNumber": clientNumber,
Expand Down Expand Up @@ -718,7 +719,7 @@ func TestSIPJoinRoomIndividual(t *testing.T) {
Kind: livekit.ParticipantInfo_SIP,
Metadata: meta,
Attributes: map[string]string{
"sip.callID": "<test>", // special case
"sip.callID": lktest.AttrTestAny, // special case
"sip.callStatus": "active",
"sip.trunkPhoneNumber": serverNumber,
"sip.phoneNumber": clientNumber,
Expand Down Expand Up @@ -793,7 +794,7 @@ func TestSIPAudio(t *testing.T) {
Kind: livekit.ParticipantInfo_SIP,
Metadata: meta,
Attributes: map[string]string{
"sip.callID": "<test>", // special case
"sip.callID": lktest.AttrTestAny, // special case
"sip.callStatus": "active",
"sip.trunkPhoneNumber": serverNumber,
"sip.phoneNumber": fmt.Sprintf("+%d", 111111111*(i+1)),
Expand Down Expand Up @@ -845,7 +846,6 @@ func TestSIPOutbound(t *testing.T) {
userName = "test-user"
userPass = "test-pass"
roomPin = "*1234"
dtmfPin = "ww*12w34ww#" // with added delays
meta = `{"test":true}`
)

Expand All @@ -863,15 +863,16 @@ func TestSIPOutbound(t *testing.T) {
headersIn := map[string]string{
"X-LK-From-1": "inbound",
}
roomPin, dtmfPin := roomPin, dtmfPin
roomPin := roomPin
if withPin {
// We cannot set headers because of the PIN. See TestSIPJoinPinRoom for details.
delete(headersIn, "X-LK-From-1")
} else {
roomPin, dtmfPin = "", ""
roomPin = ""
}
// Configure Trunk for inbound server.
trunkIn := srvIn.CreateTrunkIn(t, &livekit.SIPInboundTrunkInfo{
Name: "Test In",
Numbers: []string{serverNumber},
AuthUsername: userName,
AuthPassword: userPass,
Expand All @@ -890,6 +891,7 @@ func TestSIPOutbound(t *testing.T) {

// Configure Trunk for outbound server and make a SIP call.
trunkOut := srvOut.CreateTrunkOut(t, &livekit.SIPOutboundTrunkInfo{
Name: "Test Out",
Numbers: []string{clientNumber},
Address: srvIn.Address,
Transport: tr,
Expand Down Expand Up @@ -923,18 +925,12 @@ func TestSIPOutbound(t *testing.T) {
// Running sub test here is important, because TestSIPOutbound registers Cleanup funcs.
t.Run(fmt.Sprintf("run %d", i+1), func(t *testing.T) {
lktest.TestSIPOutbound(t, ctx, lkOut.LiveKit, lkIn.LiveKit, lktest.SIPOutboundTestParams{
TrunkOut: trunkOut,
NumberOut: clientNumber,
RoomOut: "outbound",
TrunkIn: trunkIn,
RuleIn: ruleIn,
NumberIn: serverNumber,
RoomIn: roomIn,
RoomPin: dtmfPin,
MetaIn: meta,
AttrsIn: expAttrsIn,
AttrsOut: expAttrsOut,
TestDMTF: true,
TrunkOut: trunkOut,
RoomOut: "outbound",
TrunkIn: trunkIn,
RuleIn: ruleIn,
AttrsIn: expAttrsIn,
AttrsOut: expAttrsOut,
})
})
}
Expand Down
86 changes: 67 additions & 19 deletions test/lktest/livekit.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (lk *LiveKit) RoomParticipants(t TB, room string) []*livekit.ParticipantInf
return resp.Participants
}

func (lk *LiveKit) CreateSIPParticipant(t TB, req *livekit.CreateSIPParticipantRequest) {
func (lk *LiveKit) CreateSIPParticipant(t TB, req *livekit.CreateSIPParticipantRequest) *livekit.SIPParticipantInfo {
r, err := lk.SIP.CreateSIPParticipant(context.Background(), req)
if err != nil {
t.Fatal(err)
Expand All @@ -92,6 +92,7 @@ func (lk *LiveKit) CreateSIPParticipant(t TB, req *livekit.CreateSIPParticipantR
Room: req.RoomName, Identity: r.ParticipantIdentity,
})
})
return r
}

func (lk *LiveKit) Connect(t TB, room, identity string, cb *lksdk.RoomCallback) *lksdk.Room {
Expand Down Expand Up @@ -141,9 +142,17 @@ func (lk *LiveKit) ConnectWithAudio(t TB, room, identity string, cb *lksdk.RoomC
return r
}

func (lk *LiveKit) ConnectParticipant(t TB, room, identity string, cb *lksdk.RoomCallback) *Participant {
type RoomParticipantCallback struct {
lksdk.RoomCallback
OnSIPStatus func(p *lksdk.RemoteParticipant, callID string, status string)
}

func (lk *LiveKit) ConnectParticipant(t TB, room, identity string, cb *RoomParticipantCallback) *Participant {
var origCB lksdk.RoomCallback
if cb == nil {
cb = new(lksdk.RoomCallback)
cb = new(RoomParticipantCallback)
} else {
origCB = cb.RoomCallback
}
p := &Participant{t: t}
pr, pw := media.Pipe[media.PCM16Sample](RoomSampleRate)
Expand All @@ -153,14 +162,17 @@ func (lk *LiveKit) ConnectParticipant(t TB, room, identity string, cb *lksdk.Roo
})
p.AudioIn = pr
p.mixIn = mixer.NewMixer(pw, rtp.DefFrameDur)
cb.ParticipantCallback.OnTrackPublished = func(pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
cb.OnTrackPublished = func(pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
if pub.Kind() == lksdk.TrackKindAudio {
if err := pub.SetSubscribed(true); err != nil {
t.Error("cannot subscribe to the track", pub.SID(), err)
}
}
if origCB.OnTrackPublished != nil {
origCB.OnTrackPublished(pub, rp)
}
}
cb.ParticipantCallback.OnTrackSubscribed = func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
cb.OnTrackSubscribed = func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
inp := p.mixIn.NewInput()
defer inp.Close()

Expand All @@ -173,14 +185,50 @@ func (lk *LiveKit) ConnectParticipant(t TB, room, identity string, cb *lksdk.Roo
h := rtp.NewMediaStreamIn[opus.Sample](odec)
_ = rtp.HandleLoop(track, h)
}
cb.OnParticipantConnected = func(p *lksdk.RemoteParticipant) {
if origCB.OnParticipantConnected != nil {
origCB.OnParticipantConnected(p)
}
switch p.Kind() {
case lksdk.ParticipantSIP:
if cb.OnSIPStatus != nil {
callID := p.Attributes()[livekit.AttrSIPCallID]
status := p.Attributes()[livekit.AttrSIPCallStatus]
cb.OnSIPStatus(p, callID, status)
}
}
}
cb.OnParticipantDisconnected = func(p *lksdk.RemoteParticipant) {
if origCB.OnParticipantDisconnected != nil {
origCB.OnParticipantDisconnected(p)
}
switch p.Kind() {
case lksdk.ParticipantSIP:
if cb.OnSIPStatus != nil {
callID := p.Attributes()[livekit.AttrSIPCallID]
status := p.Attributes()[livekit.AttrSIPCallStatus]
if status == "" {
status = "disconnect-unk"
}
cb.OnSIPStatus(p, callID, status)
}
}
}
cb.OnAttributesChanged = func(changed map[string]string, p lksdk.Participant) {
name := ""
if p != nil {
name = p.Name()
if origCB.OnAttributesChanged != nil {
origCB.OnAttributesChanged(changed, p)
}
switch p.Kind() {
case lksdk.ParticipantSIP:
rp, _ := p.(*lksdk.RemoteParticipant)
if rp != nil && cb.OnSIPStatus != nil {
callID := p.Attributes()[livekit.AttrSIPCallID]
status := p.Attributes()[livekit.AttrSIPCallStatus]
cb.OnSIPStatus(rp, callID, status)
}
}
t.Logf("attributes changed: %s: %v", name, changed)
}
p.Room = lk.Connect(t, room, identity, cb)
p.Room = lk.Connect(t, room, identity, &cb.RoomCallback)
for _, rp := range p.Room.GetRemoteParticipants() {
for _, pub := range rp.TrackPublications() {
cb.ParticipantCallback.OnTrackPublished(pub.(*lksdk.RemoteTrackPublication), rp)
Expand Down Expand Up @@ -394,7 +442,7 @@ func compareParticipants(t TB, exp *ParticipantInfo, got *livekit.ParticipantInf
return nil
}

func (lk *LiveKit) ExpectParticipants(t TB, ctx context.Context, room string, participants []ParticipantInfo) {
func (lk *LiveKit) ExpectParticipants(t TB, ctx context.Context, room string, participants []ParticipantInfo) []*livekit.ParticipantInfo {
slices.SortFunc(participants, func(a, b ParticipantInfo) int {
return strings.Compare(a.Identity, b.Identity)
})
Expand All @@ -407,7 +455,7 @@ wait:
select {
case <-ctx.Done():
require.Len(t, list, len(participants), "timeout waiting for participants")
return
return nil
case <-ticker.C:
continue wait
}
Expand All @@ -421,13 +469,13 @@ wait:
select {
case <-ctx.Done():
require.NoError(t, err)
return
return nil
case <-ticker.C:
continue wait
}
}
}
return // all good
return list // all good
}
}

Expand Down Expand Up @@ -463,18 +511,18 @@ func (lk *LiveKit) waitRooms(t TB, ctx context.Context, none bool, filter func(r
}
}

func (lk *LiveKit) ExpectRoomWithParticipants(t TB, ctx context.Context, room string, participants []ParticipantInfo) {
func (lk *LiveKit) ExpectRoomWithParticipants(t TB, ctx context.Context, room string, participants []ParticipantInfo) []*livekit.ParticipantInfo {
filter := func(r *livekit.Room) bool {
return r.Name == room
}
rooms := lk.waitRooms(t, ctx, len(participants) == 0, filter)
if len(participants) == 0 && len(rooms) == 0 {
return
return nil
}
require.Len(t, rooms, 1)
require.True(t, filter(rooms[0]))

lk.ExpectParticipants(t, ctx, room, participants)
return lk.ExpectParticipants(t, ctx, room, participants)
}

func (lk *LiveKit) ExpectRoomPref(t TB, ctx context.Context, pref, number string, none bool) *livekit.Room {
Expand All @@ -488,7 +536,7 @@ func (lk *LiveKit) ExpectRoomPref(t TB, ctx context.Context, pref, number string
return rooms[0]
}

func (lk *LiveKit) ExpectRoomPrefWithParticipants(t TB, ctx context.Context, pref, number string, participants []ParticipantInfo) {
func (lk *LiveKit) ExpectRoomPrefWithParticipants(t TB, ctx context.Context, pref, number string, participants []ParticipantInfo) []*livekit.ParticipantInfo {
room := lk.ExpectRoomPref(t, ctx, pref, number, len(participants) != 0)
lk.ExpectParticipants(t, ctx, room.Name, participants)
return lk.ExpectParticipants(t, ctx, room.Name, participants)
}
Loading

0 comments on commit 119e697

Please sign in to comment.