Skip to content

Commit

Permalink
Add regression test for matrix-org/synapse#16463 (#681)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Oct 19, 2023
1 parent 39b3b9d commit fdfbde6
Showing 1 changed file with 290 additions and 0 deletions.
290 changes: 290 additions & 0 deletions tests/csapi/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/matrix-org/complement/helpers"
"github.com/matrix-org/complement/internal/federation"
"github.com/matrix-org/complement/runtime"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/fclient"
"github.com/matrix-org/util"
)

// Observes "first bug" from https://github.com/matrix-org/dendrite/pull/1394#issuecomment-687056673
Expand Down Expand Up @@ -374,6 +377,191 @@ func TestSync(t *testing.T) {
})
}

// This is a regression test for
// https://github.com/matrix-org/synapse/issues/16463
//
// We test this by having a local user (alice) and remote user (charlie) in a
// room. Charlie sends 50+ messages into the room without sending to Alice's
// server. Charlie then sends one more which get sent to Alice.
//
// Alice should observe that she receives some (though not all) of charlie's
// events, with the `limited` flag set.
func TestSyncTimelineGap(t *testing.T) {
runtime.SkipIf(t, runtime.Dendrite)
deployment := complement.Deploy(t, 1)
defer deployment.Destroy(t)
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})

srv := federation.NewServer(t, deployment,
federation.HandleKeyRequests(),
federation.HandleTransactionRequests(nil, nil),
)
cancel := srv.Listen()
defer cancel()

charlie := srv.UserID("charlie")

roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"})
room := srv.MustJoinRoom(t, deployment, "hs1", roomID, charlie)

filterID := createFilter(t, alice, map[string]interface{}{
"room": map[string]interface{}{
"timeline": map[string]interface{}{
"limit": 20,
},
},
})
_, nextBatch := alice.MustSync(t, client.SyncReq{Filter: filterID})
t.Logf("Next batch %s", nextBatch)

alice.SendEventSynced(t, roomID, b.Event{
Type: "m.room.message",
Sender: alice.UserID,
Content: map[string]interface{}{
"body": "Hi from Alice!",
"msgtype": "m.text",
},
})

// Create 50 messages, but don't send them to Alice
var missingEvents []gomatrixserverlib.PDU
for i := 0; i < 50; i++ {
event := srv.MustCreateEvent(t, room, federation.Event{
Type: "m.room.message",
Sender: charlie,
Content: map[string]interface{}{
"body": "Remote message",
"msgtype": "m.text",
},
})
room.AddEvent(event)
missingEvents = append(missingEvents, event)
}

// Create one more event that we will send to Alice, which references the
// previous 50.
lastEvent := srv.MustCreateEvent(t, room, federation.Event{
Type: "m.room.message",
Sender: charlie,
Content: map[string]interface{}{
"body": "End",
"msgtype": "m.text",
},
})
room.AddEvent(lastEvent)

// Alice's HS will try and fill in the gap, so we need to respond to those
// requests.
respondToGetMissingEventsEndpoints(t, srv, room, missingEvents)

srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{lastEvent.JSON()}, nil)

// We now test two different modes of /sync work. The first is when we are
// syncing when the server receives the `lastEvent` (and so, at least
// Synapse, will start sending down some events immediately). In this mode
// we may see alice's message, but charlie's messages should set the limited
// flag.
//
// The second mode is when we incremental sync *after* all the events have
// finished being persisted, and so we get only charlie's messages.
t.Run("incremental", func(t *testing.T) {
timelineSequence := make([]gjson.Result, 0)

t.Logf("Doing incremental syncs from %s", nextBatch)

// This just reads all timeline batches into `timelineSequence` until we see `lastEvent` come down
alice.MustSyncUntil(t, client.SyncReq{Since: nextBatch, Filter: filterID}, func(clientUserID string, topLevelSyncJSON gjson.Result) error {
t.Logf("next batch %s", topLevelSyncJSON.Get("next_batch").Str)

roomResult := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID) + ".timeline")
if !roomResult.Exists() {
return fmt.Errorf("No entry for room (%s)", roomID)
}

timelineSequence = append(timelineSequence, roomResult)

events := roomResult.Get("events")
if !events.Exists() || !events.IsArray() {
return fmt.Errorf("Invalid events entry (%s)", roomResult.Raw)
}

foundLastEvent := false
for _, ev := range events.Array() {
if ev.Get("event_id").Str == lastEvent.EventID() {
foundLastEvent = true
}
}

if !foundLastEvent {
return fmt.Errorf("Did not find lastEvent (%s) in timeline batch: (%s)", lastEvent.EventID(), roomResult.Raw)
}

return nil
})

t.Logf("Got timeline sequence: %s", timelineSequence)

// Check that we only see Alice's message from before the gap *before*
// we seen any limited batches, and vice versa for Charlie's messages.
limited := false
for _, section := range timelineSequence {
limited = limited || section.Get("limited").Bool()
events := section.Get("events").Array()
for _, ev := range events {
if limited {
if ev.Get("sender").Str == alice.UserID {
t.Fatalf("Got message from alice after limited flag")
}
} else {
if ev.Get("sender").Str == charlie {
t.Fatalf("Got message from remote without limited flag being set")
}
}
}
}

if !limited {
t.Fatalf("No timeline batch for the room was limited")
}
})

t.Run("full", func(t *testing.T) {
// Wait until we see `lastEvent` come down sync implying that all events have been persisted
// by alice's homeserver.
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(roomID, lastEvent.EventID()))

// Now an incremental sync from before should return a limited batch for
// the room, with just Charlie's messages.
topLevelSyncJSON, _ := alice.MustSync(t, client.SyncReq{Since: nextBatch, Filter: filterID})
roomResult := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID))
if !roomResult.Exists() {
t.Fatalf("No entry for room (%s)", roomID)
}

eventsJson := roomResult.Get("timeline.events")
if !eventsJson.Exists() || !eventsJson.IsArray() {
t.Fatalf("Invalid events entry (%s)", roomResult.Raw)
}

eventsArray := eventsJson.Array()

if eventsArray[len(eventsArray)-1].Get("event_id").Str != lastEvent.EventID() {
t.Fatalf("Did not find lastEvent (%s) in timeline batch: (%s)", lastEvent.EventID(), roomResult.Raw)
}

if roomResult.Get("timeline.limited").Bool() == false {
t.Fatalf("Timeline batch was not limited (%s)", roomResult.Raw)
}

for _, ev := range eventsArray {
if ev.Get("sender").Str == alice.UserID {
t.Fatalf("Found an event from alice in batch (%s)", roomResult.Raw)
}
}
})

}

// Test presence from people in 2 different rooms in incremental sync
func TestPresenceSyncDifferentRooms(t *testing.T) {
deployment := complement.Deploy(t, 1)
Expand Down Expand Up @@ -561,3 +749,105 @@ func usersInPresenceEvents(t *testing.T, presence gjson.Result, users []string)
t.Fatalf("expected %d presence events, got %d: %+v", len(users), foundCounter, presenceEvents)
}
}

func eventIDsFromEvents(he []gomatrixserverlib.PDU) []string {
eventIDs := make([]string, len(he))
for i := range he {
eventIDs[i] = he[i].EventID()
}
return eventIDs
}

// Helper method to respond to federation APIs associated with trying to get missing events.
func respondToGetMissingEventsEndpoints(t *testing.T, srv *federation.Server, room *federation.ServerRoom, missingEvents []gomatrixserverlib.PDU) {
srv.Mux().HandleFunc(
"/_matrix/federation/v1/state_ids/{roomID}",
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
t.Logf("Got /state_ids for %s", pathParams["roomID"])
if pathParams["roomID"] != room.RoomID {
t.Errorf("Received /state_ids for the wrong room: %s", room.RoomID)
return util.JSONResponse{
Code: 400,
JSON: "wrong room",
}
}

roomState := room.AllCurrentState()
return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{
"pdu_ids": eventIDsFromEvents(roomState),
"auth_chain_ids": eventIDsFromEvents(room.AuthChainForEvents(roomState)),
},
}
})).Methods("GET")

srv.Mux().HandleFunc(
"/_matrix/federation/v1/state/{roomID}",
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
t.Logf("Got /state for %s", pathParams["roomID"])
if pathParams["roomID"] != room.RoomID {
t.Errorf("Received /state_ids for the wrong room: %s", room.RoomID)
return util.JSONResponse{
Code: 400,
JSON: "wrong room",
}
}

roomState := room.AllCurrentState()
return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{
"pdus": roomState,
"auth_chain": room.AuthChainForEvents(roomState),
},
}
})).Methods("GET")

srv.Mux().HandleFunc(
"/_matrix/federation/v1/event/{eventID}",
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
t.Logf("Got /event for %s", pathParams["eventID"])

for _, ev := range missingEvents {
if ev.EventID() == pathParams["eventID"] {
t.Logf("Returning event %s", pathParams["eventID"])
return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{
"origin": srv.ServerName(),
"origin_server_ts": 0,
"pdus": []json.RawMessage{ev.JSON()},
},
}
}
}

t.Logf("No event found")
return util.JSONResponse{
Code: 404,
JSON: map[string]interface{}{},
}
})).Methods("GET")

srv.Mux().HandleFunc(
"/_matrix/federation/v1/get_missing_events/{roomID}",
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
t.Logf("Got /get_missing_events for %s", pathParams["roomID"])
if pathParams["roomID"] != room.RoomID {
t.Errorf("Received /get_missing_events for the wrong room: %s", room.RoomID)
return util.JSONResponse{
Code: 400,
JSON: "wrong room",
}
}

return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{
"events": missingEvents[len(missingEvents)-10:],
},
}
}),
).Methods("POST")
}

0 comments on commit fdfbde6

Please sign in to comment.