Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more tests for receiving events while resyncing for a partial join #419

Merged
Merged
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a07b041
Factor out CreateMessageEvent() function
Jul 21, 2022
7b483be
Factor out `testReceiveEventDuringPartialStateJoin()` function
Jul 21, 2022
bd2b688
Add `handleGetMissingEventsRequests()` function to respond to `/get_m…
Jul 21, 2022
d085f11
Test whether the homeserver thinks it has full state at a received event
Jul 21, 2022
2cbeaae
Allow explicitly specified `/state` and `/state_ids` requests to comp…
Jul 21, 2022
894d755
Add simple test for receiving an event with a missing prev event
Jul 21, 2022
0643979
Add a test for receiving an event with one missing and one known prev…
Jul 21, 2022
5e38528
Add a test for receiving an event with a missing prev event, with one…
Jul 21, 2022
2979d24
fixup: s/sent/created/
Jul 22, 2022
a5b3c71
fixup: define StateIDsResult struct and send /state_id failures down …
Jul 22, 2022
8f5b930
fixup: rename new tests to use parents/grandparents terminology
Jul 22, 2022
49e35b4
Revert "Allow explicitly specified `/state` and `/state_ids` requests…
Jul 22, 2022
1adf5aa
Faster joins tests: make request handlers more specific
richvdh Jul 21, 2022
74dc057
fixup: Add explicit /state_id and /state handlers instead
Jul 22, 2022
5a9832f
Log which /state_ids request is being replied to
Jul 22, 2022
d437bfb
fixup: explain purpose of deferred channel read
Jul 22, 2022
8abc0e8
fixup: link to synapse#13288
Jul 22, 2022
8f0b234
Merge remote-tracking branch 'origin/main' into squah/faster_room_joi…
Jul 22, 2022
98907c0
fixup: Use WithRetryUntil to retry /event
Jul 25, 2022
47ce7ba
Use Context.WithTimeout instead of a SendFederationRequest goroutine
Jul 25, 2022
1a0fa9f
Add comment explaining purpose of early /state_ids request
Jul 25, 2022
e3d8197
fixup: Remove .vscode/settings.json
Jul 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 82 additions & 68 deletions tests/federation_room_join_partial_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,15 @@ func TestPartialStateJoin(t *testing.T) {
// the HS will make an /event_auth request for the event
federation.HandleEventAuthRequests()(psjResult.Server)

// derek sends an event in the room
event := psjResult.CreateMessageEvent(t, "derek", nil)
t.Logf("Derek sent event event ID %s", event.EventID())
t.Logf("Derek created event with ID %s", event.EventID())

// derek sends an event in the room
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event)
})

// we should be able to receive events with missing prevs over federation during the resync
t.Run("CanReceiveEventsWithMissingPrevDuringPartialStateJoin", func(t *testing.T) {
// we should be able to receive events with a missing prev event over federation during the resync
t.Run("CanReceiveEventsWithMissingParentsDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
Expand All @@ -164,16 +165,16 @@ func TestPartialStateJoin(t *testing.T) {
// ... <-- M <-- A <-- B
//
// M is @alice:hs1's join event.
// A and B are regular m.room.messsage events sent by @derek from Complement.
// A and B are regular m.room.messsage events created by @derek on the Complement homeserver.
//
// initially, hs1 only knows about event M.
// we send only event B to hs1.
eventM := psjResult.ServerRoom.CurrentState("m.room.member", alice.UserID)
eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()})
eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID()})
t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID())
t.Logf("Derek sent event A with ID %s", eventA.EventID())
t.Logf("Derek sent event B with ID %s", eventB.EventID())
t.Logf("Derek created event A with ID %s", eventA.EventID())
t.Logf("Derek created event B with ID %s", eventB.EventID())

// the HS will make an /event_auth request for event A
federation.HandleEventAuthRequests()(psjResult.Server)
Expand All @@ -185,8 +186,8 @@ func TestPartialStateJoin(t *testing.T) {
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB)
})

// we should be able to receive events with partially missing prevs over federation during the resync
t.Run("CanReceiveEventsWithHalfMissingPrevsDuringPartialStateJoin", func(t *testing.T) {
// we should be able to receive events with partially missing prev events over federation during the resync
t.Run("CanReceiveEventsWithHalfMissingParentsDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
Expand All @@ -200,16 +201,16 @@ func TestPartialStateJoin(t *testing.T) {
// ... <-- M <-- A <-- B
//
// M is @alice:hs1's join event.
// A and B are regular m.room.messsage events sent by @derek from Complement.
// A and B are regular m.room.messsage events created by @derek on the Complement homeserver.
//
// initially, hs1 only knows about event M.
// we send only event B to hs1.
eventM := psjResult.ServerRoom.CurrentState("m.room.member", alice.UserID)
eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()})
eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID(), eventM.EventID()})
t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID())
t.Logf("Derek sent event A with ID %s", eventA.EventID())
t.Logf("Derek sent event B with ID %s", eventB.EventID())
t.Logf("Derek created event A with ID %s", eventA.EventID())
t.Logf("Derek created event B with ID %s", eventB.EventID())

// the HS will make an /event_auth request for event A
federation.HandleEventAuthRequests()(psjResult.Server)
Expand All @@ -221,8 +222,9 @@ func TestPartialStateJoin(t *testing.T) {
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB)
})

// we should be able to receive events with missing prevs over federation during the resync
t.Run("CanReceiveEventsWithMissingPrevWithHalfMissingPrevsDuringPartialStateJoin", func(t *testing.T) {
// we should be able to receive events with a missing prev event, with half missing prev events,
// over federation during the resync
t.Run("CanReceiveEventsWithHalfMissingGrandparentsDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
Expand All @@ -236,7 +238,7 @@ func TestPartialStateJoin(t *testing.T) {
// ... <-- M <-- A <-- B <-- C
//
// M is @alice:hs1's join event.
// A, B and C are regular m.room.messsage events sent by @derek from Complement.
// A, B and C are regular m.room.messsage events created by @derek on the Complement homeserver.
//
// initially, hs1 only knows about event M.
// we send only event C to hs1.
Expand All @@ -245,15 +247,18 @@ func TestPartialStateJoin(t *testing.T) {
eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID(), eventM.EventID()})
eventC := psjResult.CreateMessageEvent(t, "derek", []string{eventB.EventID()})
t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID())
t.Logf("Derek sent event A with ID %s", eventA.EventID())
t.Logf("Derek sent event B with ID %s", eventB.EventID())
t.Logf("Derek sent event C with ID %s", eventC.EventID())
psjResult.AllowStateRequestForEvent(eventA.EventID())
t.Logf("Derek created event A with ID %s", eventA.EventID())
t.Logf("Derek created event B with ID %s", eventB.EventID())
t.Logf("Derek created event C with ID %s", eventC.EventID())

// the HS will make a /get_missing_events request for the missing prev event of event C,
// to which we respond with event B only.
handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, []*gomatrixserverlib.Event{eventB})

// dedicated state_ids and state handlers for event A
handleStateIdsRequests(t, psjResult.Server, psjResult.ServerRoom, eventA.EventID(), psjResult.ServerRoom.AllCurrentState(), nil, nil)
handleStateRequests(t, psjResult.Server, psjResult.ServerRoom, eventA.EventID(), psjResult.ServerRoom.AllCurrentState(), nil, nil)

// send event C to hs1
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC)
})
Expand Down Expand Up @@ -602,8 +607,13 @@ func testReceiveEventDuringPartialStateJoin(
// * block because the homeserver does not have full state at the last event
// * or 403 because the homeserver does not have full state yet and does not consider the
// Complement homeserver to be in the room
stateIdsResponseChan := make(chan *gomatrixserverlib.RespStateIDs)
defer close(stateIdsResponseChan)

type StateIDsResult struct {
RespStateIDs gomatrixserverlib.RespStateIDs
Error error
}
stateIdsResultChan := make(chan StateIDsResult)
defer close(stateIdsResultChan)
go func() {
stateReq := gomatrixserverlib.NewFederationRequest("GET", "hs1",
fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s",
Expand All @@ -613,26 +623,28 @@ func testReceiveEventDuringPartialStateJoin(
)
var respStateIDs gomatrixserverlib.RespStateIDs
if err := psjResult.Server.SendFederationRequest(deployment, stateReq, &respStateIDs); err != nil {
httpErr, ok := err.(gomatrix.HTTPError)
t.Logf("%v", httpErr)
if ok && httpErr.Code == 403 {
stateIdsResponseChan <- nil
return
}
t.Errorf("/state_ids request returned non-200: %s", err)
return
stateIdsResultChan <- StateIDsResult{Error: err}
} else {
stateIdsResultChan <- StateIDsResult{RespStateIDs: respStateIDs}
}
stateIdsResponseChan <- &respStateIDs
}()

select {
case <-time.After(1 * time.Second):
t.Logf("/state_ids request for event %s blocked as expected", event.EventID())
defer func() { <-stateIdsResponseChan }()
// read from the channel and discard the result, so that the goroutine above doesn't block
// indefinitely on the send
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm a bit surprised that closing the channel isn't sufficient for this, but fair enough)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We get a panic: send on closed channel without this. To clarify:

no close and no read -> complement times out after 10 minutes
defer close + no read -> panic
defer close + defer read -> complement is happy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see why this is done in a goroutine at all - is this because SendFederationRequest can block indefinitely? There is a default 30s timeout - if that is too long then you need to send a custom context with a timeout here -

return httpClient.DoRequestAndParseResponse(context.Background(), httpReq, resBody)
- currently this is context.Background() so it won't ever expire, but you can change that easily by doing:

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
httpClient.DoRequestAndParseResponse(ctx, httpReq, resBody)

This will require a change to Complement's API to:

func (s *Server) SendFederationRequest(ctx context.Context, deployment *docker.Deployment, req gomatrixserverlib.FederationRequest, resBody interface{}) error {

but that's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, that's a lot cleaner!

defer func() { <-stateIdsResultChan }()
break
case respStateIDs := <-stateIdsResponseChan:
if respStateIDs == nil {
t.Logf("/state_ids request for event %s returned 403 as expected", event.EventID())
case stateIDsResult := <-stateIdsResultChan:
if stateIDsResult.Error != nil {
httpErr, ok := stateIDsResult.Error.(gomatrix.HTTPError)
t.Logf("%v", httpErr)
if ok && httpErr.Code == 403 {
t.Logf("/state_ids request for event %s returned 403 as expected", event.EventID())
} else {
t.Errorf("/state_ids request returned non-200: %s", stateIDsResult.Error)
}
} else {
// since we have not yet given the homeserver the full state at the join event and allowed
// the partial join to complete, it can't possibly know the full state at the last event.
Expand Down Expand Up @@ -699,8 +711,6 @@ type partialStateJoinResult struct {
ServerRoom *federation.ServerRoom
fedStateIdsRequestReceivedWaiter *Waiter
fedStateIdsSendResponseWaiter *Waiter
// the set of events for which we will not block `/state` or `/state_ids` requests.
fedStateIdsAllowedEvents map[string]bool
}

// beginPartialStateJoin spins up a room on a complement server,
Expand Down Expand Up @@ -736,7 +746,6 @@ func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningU
// some things for orchestration
result.fedStateIdsRequestReceivedWaiter = NewWaiter()
result.fedStateIdsSendResponseWaiter = NewWaiter()
result.fedStateIdsAllowedEvents = make(map[string]bool)

// create the room on the complement server, with charlie and derek as members
roomVer := joiningUser.GetDefaultRoomVersion(t)
Expand All @@ -750,19 +759,23 @@ func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningU
},
}))

// register a handler for /state_ids requests, which finishes fedStateIdsRequestReceivedWaiter, then
// register a handler for /state_ids requests for the most recent event,
// which finishes fedStateIdsRequestReceivedWaiter, then
// waits for fedStateIdsSendResponseWaiter and sends a reply
lastEvent := result.ServerRoom.Timeline[len(result.ServerRoom.Timeline)-1]
currentState := result.ServerRoom.AllCurrentState()
handleStateIdsRequests(
t,
result.Server,
result.ServerRoom,
result.fedStateIdsRequestReceivedWaiter,
result.fedStateIdsSendResponseWaiter,
result.fedStateIdsAllowedEvents,
t, result.Server, result.ServerRoom,
lastEvent.EventID(), currentState,
result.fedStateIdsRequestReceivedWaiter, result.fedStateIdsSendResponseWaiter,
)

// a handler for /state requests, which sends a sensible response
handleStateRequests(t, result.Server, result.ServerRoom, nil, nil, nil)
handleStateRequests(
t, result.Server, result.ServerRoom,
lastEvent.EventID(), currentState,
nil, nil,
)

// have joiningUser join the room by room ID.
joiningUser.JoinRoom(t, result.ServerRoom.RoomID, []string{result.Server.ServerName()})
Expand Down Expand Up @@ -810,12 +823,6 @@ func (psj *partialStateJoinResult) CreateMessageEvent(t *testing.T, senderLocalp
return event
}

// allow a /state_ids request for a given event to complete before FinishStateRequest has been called.
// only applies to new incoming requests, and not any currently blocked ones.
func (psj *partialStateJoinResult) AllowStateRequestForEvent(eventID string) {
psj.fedStateIdsAllowedEvents[eventID] = true
}

// wait for a /state_ids request for the test room to arrive
func (psj *partialStateJoinResult) AwaitStateIdsRequest(t *testing.T) {
psj.fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request")
Expand All @@ -826,31 +833,34 @@ func (psj *partialStateJoinResult) FinishStateRequest() {
psj.fedStateIdsSendResponseWaiter.Finish()
}

// handleStateIdsRequests registers a handler for /state_ids requests for serverRoom.
// handleStateIdsRequests registers a handler for /state_ids requests for 'eventID'
//
// the returned state is as passed in 'roomState'
//
// if requestReceivedWaiter is not nil, it will be Finish()ed when the request arrives.
// if sendResponseWaiter is not nil, we will Wait() for it to finish before sending the response.
func handleStateIdsRequests(
t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom,
requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, allowedEvents map[string]bool,
eventID string, roomState []*gomatrixserverlib.Event,
requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter,
) {
srv.Mux().Handle(
srv.Mux().NewRoute().Methods("GET").Path(
fmt.Sprintf("/_matrix/federation/v1/state_ids/%s", serverRoom.RoomID),
).Queries("event_id", eventID).Handler(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
queryParams := req.URL.Query()
t.Logf("Incoming state_ids request for event %s in room %s", queryParams["event_id"], serverRoom.RoomID)
if requestReceivedWaiter != nil {
requestReceivedWaiter.Finish()
}
if !allowedEvents[queryParams["event_id"][0]] &&
sendResponseWaiter != nil {
if sendResponseWaiter != nil {
sendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state_ids request")
}
t.Logf("Replying to /state_ids request")
t.Logf("Replying to /state_ids request for event %s", queryParams["event_id"])

res := gomatrixserverlib.RespStateIDs{
AuthEventIDs: eventIDsFromEvents(serverRoom.AuthChain()),
StateEventIDs: eventIDsFromEvents(serverRoom.AllCurrentState()),
AuthEventIDs: eventIDsFromEvents(serverRoom.AuthChainForEvents(roomState)),
StateEventIDs: eventIDsFromEvents(roomState),
}
w.WriteHeader(200)
jsonb, _ := json.Marshal(res)
Expand All @@ -859,32 +869,36 @@ func handleStateIdsRequests(
t.Errorf("Error writing to request: %v", err)
}
}),
).Methods("GET")
)
t.Logf("Registered state_ids handler for event %s", eventID)
}

// makeStateHandler returns a handler for /state requests for serverRoom.
// makeStateHandler returns a handler for /state requests for 'eventID'
//
// the returned state is as passed in 'roomState'
//
// if requestReceivedWaiter is not nil, it will be Finish()ed when the request arrives.
// if sendResponseWaiter is not nil, we will Wait() for it to finish before sending the response.
func handleStateRequests(
t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom,
requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, allowedEvents map[string]bool,
eventID string, roomState []*gomatrixserverlib.Event,
requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter,
) {
srv.Mux().Handle(
srv.Mux().NewRoute().Methods("GET").Path(
fmt.Sprintf("/_matrix/federation/v1/state/%s", serverRoom.RoomID),
).Queries("event_id", eventID).Handler(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
queryParams := req.URL.Query()
t.Logf("Incoming state request for event %s in room %s", queryParams["event_id"], serverRoom.RoomID)
if requestReceivedWaiter != nil {
requestReceivedWaiter.Finish()
}
if !allowedEvents[queryParams["event_id"][0]] &&
sendResponseWaiter != nil {
if sendResponseWaiter != nil {
sendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state request")
}
res := gomatrixserverlib.RespState{
AuthEvents: gomatrixserverlib.NewEventJSONsFromEvents(serverRoom.AuthChain()),
StateEvents: gomatrixserverlib.NewEventJSONsFromEvents(serverRoom.AllCurrentState()),
AuthEvents: gomatrixserverlib.NewEventJSONsFromEvents(serverRoom.AuthChainForEvents(roomState)),
StateEvents: gomatrixserverlib.NewEventJSONsFromEvents(roomState),
}
w.WriteHeader(200)
jsonb, _ := json.Marshal(res)
Expand All @@ -893,7 +907,7 @@ func handleStateRequests(
t.Errorf("Error writing to request: %v", err)
}
}),
).Methods("GET")
)
}

// register a handler for `/get_missing_events` requests
Expand Down