Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Federation /messages tests #443

Closed
wants to merge 12 commits into from
44 changes: 44 additions & 0 deletions internal/b/blueprints.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,47 @@ func manyMessages(senders []string, count int) []Event {
}
return evs
}

func manJoinEvents(homeserver string, users []User) []Event {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func manJoinEvents(homeserver string, users []User) []Event {
func manyJoinEvents(homeserver string, users []User) []Event {

evs := make([]Event, len(users))
for i := 0; i < len(users); i++ {
user := users[i]
evs[i] = Event{
Type: "m.room.member",
StateKey: Ptr(fmt.Sprintf("%s:%s", user.Localpart, homeserver)),
Content: map[string]interface{}{
"membership": "join",
},
Sender: user.Localpart,
}
}
return evs
}

func manyUsers(count int) []User {
users := make([]User, count)

for i := 0; i < count; i++ {
localPart := fmt.Sprintf("@user_%d", i)
displayName := fmt.Sprintf("User %d", i)
deviceID := fmt.Sprintf("USERDEVICE%d", i)

users[i] = User{
Localpart: localPart,
DisplayName: displayName,
OneTimeKeys: 50,
DeviceID: Ptr(deviceID),
}
}

return users
}

func getSendersFromUsers(users []User) []string {
senders := make([]string, len(users))
for i := 0; i < len(users); i++ {
senders[i] = users[i].Localpart
}

return senders
}
19 changes: 0 additions & 19 deletions internal/b/perf_e2ee_room.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,3 @@ func memberships(count int) []Event {

return events
}

func manyUsers(count int) []User {
users := make([]User, count)

for i := 0; i < count; i++ {
localPart := fmt.Sprintf("@alice_%d", i)
displayName := fmt.Sprintf("Alice %d", i)
deviceID := fmt.Sprintf("ALICEDEVICE%d", i)

users[i] = User{
Localpart: localPart,
DisplayName: displayName,
OneTimeKeys: 50,
DeviceID: Ptr(deviceID),
}
}

return users
}
45 changes: 33 additions & 12 deletions internal/b/perf_many_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,34 @@

package b

var manyUsersList = manyUsers(1000)

func makeEvents(homeserver string) []Event {
events := []Event{
{
Type: "m.room.member",
StateKey: Ptr("@bob:hs1"),
Content: map[string]interface{}{
"membership": "join",
},
Sender: "@bob",
},
}
events = append(events, manJoinEvents(homeserver, manyUsersList)...)
events = append(events, manyMessages(getSendersFromUsers(manyUsersList), 1400)...)

//fmt.Printf("events made %d events=%+v", len(events), events)

return events
}

// BlueprintPerfManyMessages contains a homeserver with 2 users, who are joined to the same room with thousands of messages.
var BlueprintPerfManyMessages = MustValidate(Blueprint{
Name: "perf_many_messages",
Homeservers: []Homeserver{
{
Name: "hs1",
Users: []User{
Users: append([]User{
{
Localpart: "@alice",
DisplayName: "Alice",
Expand All @@ -29,23 +50,23 @@ var BlueprintPerfManyMessages = MustValidate(Blueprint{
Localpart: "@bob",
DisplayName: "Bob",
},
},
}, manyUsersList...),
Rooms: []Room{
{
CreateRoom: map[string]interface{}{
"preset": "public_chat",
},
Creator: "@alice",
Events: append([]Event{
Event{
Type: "m.room.member",
StateKey: Ptr("@bob:hs1"),
Content: map[string]interface{}{
"membership": "join",
},
Sender: "@bob",
},
}, manyMessages([]string{"@alice", "@bob"}, 7000)...),
Events: makeEvents("hs1"),
},
},
},
{
Name: "hs2",
Users: []User{
{
Localpart: "@charlie",
DisplayName: "Charlie",
},
},
},
Expand Down
37 changes: 27 additions & 10 deletions internal/docker/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,17 @@ func (d *Builder) removeImages() error {
d.log("Not cleaning up image with tags: %v", img.RepoTags)
continue
}
bprintName := img.Labels["complement_blueprint"]
//bprintName := img.Labels["complement_blueprint"]
contextStr := img.Labels[complementLabel]
keep := false
for _, keepBprint := range d.Config.KeepBlueprints {
if bprintName == keepBprint {
if contextStr == keepBprint {
keep = true
break
}
}
if keep {
d.log("Keeping image created from blueprint %s", bprintName)
d.log("Keeping image created from blueprint %s", contextStr)
continue
}
_, err = d.Docker.ImageRemove(context.Background(), img.ID, types.ImageRemoveOptions{
Expand Down Expand Up @@ -180,17 +181,33 @@ func (d *Builder) ConstructBlueprintIfNotExist(bprint b.Blueprint) error {
if err != nil {
return fmt.Errorf("ConstructBlueprintIfNotExist(%s): failed to ImageList: %w", bprint.Name, err)
}
if len(images) == 0 {
err = d.ConstructBlueprint(bprint)

var missingHomeservers []b.Homeserver
for _, homeserver := range bprint.Homeservers {
found := false
for _, image := range images {
if image.Labels["complement_hs_name"] == homeserver.Name {
found = true
break
}
}

if !found {
missingHomeservers = append(missingHomeservers, homeserver)
}
}

if len(images) < len(bprint.Homeservers) {
err = d.ConstructBlueprint(bprint, missingHomeservers)
if err != nil {
return fmt.Errorf("ConstructBlueprintIfNotExist(%s): failed to ConstructBlueprint: %w", bprint.Name, err)
}
}
return nil
}

func (d *Builder) ConstructBlueprint(bprint b.Blueprint) error {
errs := d.construct(bprint)
func (d *Builder) ConstructBlueprint(bprint b.Blueprint, homeserversToConstruct []b.Homeserver) error {
errs := d.construct(bprint, homeserversToConstruct)
if len(errs) > 0 {
for _, err := range errs {
d.log("could not construct blueprint: %s", err)
Expand Down Expand Up @@ -237,7 +254,7 @@ func (d *Builder) ConstructBlueprint(bprint b.Blueprint) error {
}

// construct all Homeservers sequentially then commits them
func (d *Builder) construct(bprint b.Blueprint) (errs []error) {
func (d *Builder) construct(bprint b.Blueprint, homeserversToConstruct []b.Homeserver) (errs []error) {
d.log("Constructing blueprint '%s'", bprint.Name)

networkID, err := createNetworkIfNotExists(d.Docker, d.Config.PackageNamespace, bprint.Name)
Expand All @@ -246,8 +263,8 @@ func (d *Builder) construct(bprint b.Blueprint) (errs []error) {
}

runner := instruction.NewRunner(bprint.Name, d.Config.BestEffort, d.Config.DebugLoggingEnabled)
results := make([]result, len(bprint.Homeservers))
for i, hs := range bprint.Homeservers {
results := make([]result, len(homeserversToConstruct))
for i, hs := range homeserversToConstruct {
res := d.constructHomeserver(bprint.Name, runner, hs, networkID)
if res.err != nil {
errs = append(errs, res.err)
Expand Down
53 changes: 53 additions & 0 deletions tests/federation_room_messages_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package tests

import (
"net/url"
"testing"
"time"

"github.com/matrix-org/complement/internal/b"
"github.com/matrix-org/complement/internal/client"
"github.com/sirupsen/logrus"
)

func TestMessagesOverFederation(t *testing.T) {
deployment := Deploy(t, b.BlueprintPerfManyMessages)
defer deployment.Destroy(t)

alice := deployment.Client(t, "hs1", "@alice:hs1")

remoteCharlie := deployment.Client(t, "hs2", "@charlie:hs2")

t.Run("parallel", func(t *testing.T) {
t.Run("asdf", func(t *testing.T) {
t.Parallel()

syncResult, _ := alice.MustSync(t, client.SyncReq{})
joinedRooms := syncResult.Get("rooms.join|@keys")
roomWithManyMessages := joinedRooms.Get("0").String()

// logrus.WithFields(logrus.Fields{
// "joinedRooms": joinedRooms,
// "roomWithManyMessages": roomWithManyMessages,
// }).Error("asdf")

remoteCharlie.JoinRoom(t, roomWithManyMessages, []string{"hs1"})

messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomWithManyMessages, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"dir": []string{"b"},
"limit": []string{"500"},
}))
messagesResBody := client.ParseJSON(t, messagesRes)
eventIDs := client.GetJSONFieldStringArray(t, messagesResBody, "chunk")

logrus.WithFields(logrus.Fields{
"joinedRooms": joinedRooms,
"roomWithManyMessages": roomWithManyMessages,
"eventIDsLength": len(eventIDs),
"eventIDs": eventIDs,
}).Error("asdf")

time.Sleep(5 * time.Second)
})
})
}