Skip to content

Commit

Permalink
Add to-device event tests over federation; incl. connectivity tests (#…
Browse files Browse the repository at this point in the history
…694)

* Add to-device event tests over federation; incl. connectivity tests

The connectivity tests form part of complement-crypto's test suite,
specifically:

> If a server cannot send device list updates over federation, it retries.

* Test we retry on startup

* Comment why we need to poke

* Update tests/federation_to_device_test.go

Co-authored-by: Till <[email protected]>

---------

Co-authored-by: Kegan Dougal <=>
Co-authored-by: Till <[email protected]>
  • Loading branch information
kegsay and S7evinK authored Nov 27, 2023
1 parent 25a2d5c commit 8a0df24
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 0 deletions.
4 changes: 4 additions & 0 deletions internal/docker/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (d *Deployment) Restart(t *testing.T) error {

func (d *Deployment) StartServer(t *testing.T, hsName string) {
t.Helper()
t.Logf("StartServer %s", hsName)
hsDep := d.HS[hsName]
if hsDep == nil {
t.Fatalf("StartServer: %s does not exist in this deployment", hsName)
Expand All @@ -257,6 +258,7 @@ func (d *Deployment) StartServer(t *testing.T, hsName string) {

func (d *Deployment) StopServer(t *testing.T, hsName string) {
t.Helper()
t.Logf("StopServer %s", hsName)
hsDep := d.HS[hsName]
if hsDep == nil {
t.Fatalf("StopServer: %s does not exist in this deployment", hsName)
Expand All @@ -268,6 +270,7 @@ func (d *Deployment) StopServer(t *testing.T, hsName string) {

func (d *Deployment) PauseServer(t *testing.T, hsName string) {
t.Helper()
t.Logf("PauseServer %s", hsName)
hsDep := d.HS[hsName]
if hsDep == nil {
t.Fatalf("PauseServer: %s does not exist in this deployment", hsName)
Expand All @@ -279,6 +282,7 @@ func (d *Deployment) PauseServer(t *testing.T, hsName string) {

func (d *Deployment) UnpauseServer(t *testing.T, hsName string) {
t.Helper()
t.Logf("UnpauseServer %s", hsName)
hsDep := d.HS[hsName]
if hsDep == nil {
t.Fatalf("UnpauseServer: %s does not exist in this deployment", hsName)
Expand Down
129 changes: 129 additions & 0 deletions tests/federation_to_device_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package tests

import (
"reflect"
"sync/atomic"
"testing"
"time"

"github.com/matrix-org/complement"
"github.com/matrix-org/complement/client"
"github.com/matrix-org/complement/helpers"
"github.com/tidwall/gjson"
)

// Test that to-device messages can go from one homeserver to another.
func TestToDeviceMessagesOverFederation(t *testing.T) {
deployment := complement.Deploy(t, 2)
defer deployment.Destroy(t)

testCases := []struct {
name string
makeUnreachable func(t *testing.T)
makeReachable func(t *testing.T)
}{
{
name: "good connectivity",
makeUnreachable: func(t *testing.T) {},
makeReachable: func(t *testing.T) {},
},
{
// cut networking but keep in-memory state
name: "interrupted connectivity",
makeUnreachable: func(t *testing.T) {
deployment.StopServer(t, "hs2")
},
makeReachable: func(t *testing.T) {
deployment.StartServer(t, "hs2")
},
},
{
// interesting because this nukes memory
name: "stopped server",
makeUnreachable: func(t *testing.T) {
deployment.StopServer(t, "hs2")
},
makeReachable: func(t *testing.T) {
// kick over the sending server first to see if the server
// remembers to resend on startup
deployment.StopServer(t, "hs1")
deployment.StartServer(t, "hs1")
// now make the receiving server reachable.
deployment.StartServer(t, "hs2")
},
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{
LocalpartSuffix: "alice",
})
bob := deployment.Register(t, "hs2", helpers.RegistrationOpts{
LocalpartSuffix: "bob",
})
// it might take a while for retries, so keep on syncing!
bob.SyncUntilTimeout = 30 * time.Second

_, bobSince := bob.MustSync(t, client.SyncReq{TimeoutMillis: "0"})

content := map[string]interface{}{
"my_key": "my_value",
}

tc.makeUnreachable(t)

alice.MustSendToDeviceMessages(t, "my.test.type", map[string]map[string]map[string]interface{}{
bob.UserID: {
bob.DeviceID: content,
},
})

checkEvent := func(result gjson.Result) bool {
if result.Get("type").Str != "my.test.type" {
return false
}

evContentRes := result.Get("content")

if !evContentRes.Exists() || !evContentRes.IsObject() {
return false
}

evContent := evContentRes.Value()

return reflect.DeepEqual(evContent, content)
}
// just in case the server returns 200 OK before flushing to disk, give it a grace period.
// This is too nice of us given in the real world no grace is provided..
time.Sleep(time.Second)

tc.makeReachable(t)

// servers may need to be poked with another to-device msg. This isn't great.
// See https://github.com/matrix-org/synapse/issues/16680
// bob has a sync timeout of 30s set, so if the test has not yet passed, we are kicking the server
// after 10s to ensure the server processes the previous sent to-device message.
var completed atomic.Bool
go func() {
time.Sleep(10 * time.Second)
if completed.Load() {
return
}
// maybe kicking the server will make things work if we're still waiting after 10s
alice.MustSendToDeviceMessages(t, "kick.type", map[string]map[string]map[string]interface{}{
bob.UserID: {
bob.DeviceID: content,
},
})
}()

bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, func(clientUserID string, topLevelSyncJSON gjson.Result) error {
t.Logf("%s", topLevelSyncJSON.Raw)
return client.SyncToDeviceHas(alice.UserID, checkEvent)(clientUserID, topLevelSyncJSON)
})
completed.Store(true)
})
}
}

0 comments on commit 8a0df24

Please sign in to comment.