From 38c1bda277329300f6ff39f6afb3bc89a8fcec86 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 13 Apr 2023 19:34:37 +0100 Subject: [PATCH 01/39] Integration test --- tests-integration/poller_test.go | 77 ++++++++++++++++++++++++++++++++ tests-integration/v3_test.go | 5 +++ 2 files changed, 82 insertions(+) diff --git a/tests-integration/poller_test.go b/tests-integration/poller_test.go index 6a008d96..5759b605 100644 --- a/tests-integration/poller_test.go +++ b/tests-integration/poller_test.go @@ -97,3 +97,80 @@ func TestSecondPollerFiltersToDevice(t *testing.T) { res = v3.mustDoV3RequestWithPos(t, deviceBToken, res.Pos, sync3.Request{}) m.MatchResponse(t, res, m.MatchToDeviceMessages([]json.RawMessage{wantMsg})) } + +// TODO test description +func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) { + pqString := testutils.PrepareDBConnectionString() + v2 := runTestV2Server(t) + v3 := runTestServer(t, v2, pqString) + defer v2.close() + defer v3.close() + deviceAToken := "DEVICE_A_TOKEN" + v2.addAccount(alice, deviceAToken) + const roomID = "!unimportant" + v2.queueResponse(deviceAToken, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{ + Join: v2JoinTimeline(roomEvents{ + roomID: roomID, + events: createRoomState(t, alice, time.Now()), + }), + }, + }) + res := v3.mustDoV3Request(t, deviceAToken, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: [][2]int64{{0, 20}}, + }, + }, + }) + + t.Log("The poller receives a gappy incremental sync response with a state block") + nameEvent := testutils.NewStateEvent( + t, + "m.room.name", + "", + alice, + map[string]interface{}{"name": "banana"}, + ) + powerLevelsEvent := testutils.NewStateEvent( + t, + "m.room.power_levels", + "", + alice, + map[string]interface{}{ + "users": map[string]int{alice: 100}, + "events_default": 10, + }, + ) + messageEvent := testutils.NewMessageEvent(t, alice, "hello") + v2.queueResponse(deviceAToken, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{ + Join: map[string]sync2.SyncV2JoinResponse{ + roomID: { + State: sync2.EventsResponse{ + Events: []json.RawMessage{nameEvent, powerLevelsEvent}, + }, + Timeline: sync2.TimelineResponse{ + Events: []json.RawMessage{messageEvent}, + Limited: true, + PrevBatch: "batchymcbatchface", + }, + }, + }, + }, + }) + + res = v3.mustDoV3RequestWithPos(t, deviceAToken, res.Pos, sync3.Request{}) + m.MatchResponse( + t, + res, + m.MatchRoomSubscription( + roomID, + m.MatchRoomTimeline([]json.RawMessage{nameEvent, powerLevelsEvent, messageEvent}), + ), + ) +} + +func eventIDFromRawMessage(message json.RawMessage) string { + return gjson.ParseBytes(message).Get("event_id").Str +} diff --git a/tests-integration/v3_test.go b/tests-integration/v3_test.go index 245d4447..0aea5dfb 100644 --- a/tests-integration/v3_test.go +++ b/tests-integration/v3_test.go @@ -40,7 +40,12 @@ var ( boolTrue = true ) +// testV2Server is a fake stand-in for the v2 sync API provided by a homeserver. type testV2Server struct { + // CheckRequest is an arbitrary function which runs after a request has been + // received from pollers, but before the resposne is generated. This allows us to + // confirm that the proxy is polling the homeserver's v2 sync endpoint in the + // manner that we expect. CheckRequest func(userID, token string, req *http.Request) mu *sync.Mutex tokenToUser map[string]string From 1d59167feb5159ff02c848d3f8479b082dd461e3 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 13 Apr 2023 18:47:15 +0100 Subject: [PATCH 02/39] E2E test case draft --- tests-e2e/client_test.go | 68 +++++++++++++++++++++++++++++++++-- tests-e2e/gappy_state_test.go | 62 ++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 3 deletions(-) create mode 100644 tests-e2e/gappy_state_test.go diff --git a/tests-e2e/client_test.go b/tests-e2e/client_test.go index 76c69cb1..7818889c 100644 --- a/tests-e2e/client_test.go +++ b/tests-e2e/client_test.go @@ -236,9 +236,8 @@ func (c *CSAPI) SetRoomAccountData(t *testing.T, roomID, eventType string, conte return c.MustDoFunc(t, "PUT", []string{"_matrix", "client", "v3", "user", c.UserID, "rooms", roomID, "account_data", eventType}, WithJSONBody(t, content)) } -// SendEventSynced sends `e` into the room and waits for its event ID to come down /sync. -// Returns the event ID of the sent event. -func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e Event) string { +// SendEventUnsynced sends `e` into the room and returns the event ID of the sent event. +func (c *CSAPI) SendEventUnsynced(t *testing.T, roomID string, e Event) string { t.Helper() c.txnID++ paths := []string{"_matrix", "client", "v3", "rooms", roomID, "send", e.Type, strconv.Itoa(c.txnID)} @@ -248,6 +247,14 @@ func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e Event) string { res := c.MustDo(t, "PUT", paths, e.Content) body := ParseJSON(t, res) eventID := GetJSONFieldStr(t, body, "event_id") + return eventID +} + +// SendEventSynced sends `e` into the room and waits for its event ID to come down /sync. +// Returns the event ID of the sent event. +func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e Event) string { + t.Helper() + eventID := c.SendEventUnsynced(t, roomID, e) t.Logf("SendEventSynced waiting for event ID %s", eventID) c.MustSyncUntil(t, SyncReq{}, SyncTimelineHas(roomID, func(r gjson.Result) bool { return r.Get("event_id").Str == eventID @@ -405,6 +412,61 @@ func (c *CSAPI) RegisterUser(t *testing.T, localpart, password string) (userID, return userID, accessToken, deviceID } +// LoginUser will create a new device for the given user. +// The new access token and device ID are overwrite those of the current CSAPI instance. +func (c *CSAPI) Login(t *testing.T, password, deviceID string) { + t.Helper() + reqBody := map[string]interface{}{ + "type": "m.login.password", + "password": password, + "identifier": map[string]interface{}{ + "type": "m.id.user", + "user": c.UserID, + }, + "device_id": deviceID, + } + res := c.MustDoFunc(t, "POST", []string{"_matrix", "client", "v3", "login"}, WithJSONBody(t, reqBody)) + + body, err := io.ReadAll(res.Body) + if err != nil { + t.Fatalf("unable to read response body: %v", err) + } + + userID := gjson.GetBytes(body, "user_id").Str + if c.UserID != userID { + t.Fatalf("Logged in as %s but response included user_id=%s", c.UserID, userID) + } + gotDeviceID := gjson.GetBytes(body, "device_id").Str + if gotDeviceID != deviceID { + t.Fatalf("Asked for device ID %s but got %s", deviceID, gotDeviceID) + } + accessToken := gjson.GetBytes(body, "access_token").Str + if c.AccessToken == accessToken { + t.Fatalf("Logged in as %s but access token did not change (still %s)", c.UserID, c.AccessToken) + } + + c.AccessToken = accessToken + c.DeviceID = deviceID +} + +// SetState PUTs a piece of state in a room and returns the event ID of the created state event. +func (c *CSAPI) SetState(t *testing.T, roomID, eventType, stateKey string, content map[string]interface{}) string { + t.Helper() + res := c.MustDoFunc( + t, + "PUT", + []string{"_matrix", "client", "v3", "rooms", roomID, "state", eventType, stateKey}, + WithJSONBody(t, content), + ) + + body, err := io.ReadAll(res.Body) + if err != nil { + t.Fatalf("unable to read response body: %v", err) + } + + return gjson.ParseBytes(body).Get("event_id").Str +} + // RegisterSharedSecret registers a new account with a shared secret via HMAC // See https://github.com/matrix-org/synapse/blob/e550ab17adc8dd3c48daf7fedcd09418a73f524b/synapse/_scripts/register_new_matrix_user.py#L40 func (c *CSAPI) RegisterSharedSecret(t *testing.T, user, pass string, isAdmin bool) (userID, accessToken, deviceID string) { diff --git a/tests-e2e/gappy_state_test.go b/tests-e2e/gappy_state_test.go new file mode 100644 index 00000000..7a211f8e --- /dev/null +++ b/tests-e2e/gappy_state_test.go @@ -0,0 +1,62 @@ +package syncv3_test + +import ( + "github.com/matrix-org/sliding-sync/sync3" + "github.com/matrix-org/sliding-sync/testutils/m" + "testing" +) + +func TestGappyState(t *testing.T) { + t.Log("Alice registers on the homeserver.") + alice := registerNewUser(t) + + t.Log("Alice creates a room") + roomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat"}) + + t.Log("Alice sends a message into that room") + alice.SendEventSynced(t, roomID, Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "Hello, world!", + }, + }) + + t.Log("Alice logs out of her first device.") + alice.MustDoFunc(t, "POST", []string{"_matrix", "client", "v3", "logout"}) + + t.Log("Alice logs in again on her second device.") + alice.Login(t, "password", "device2") + + t.Log("Alice sets two pieces of room state while the proxy isn't polling.") + nameContent := map[string]interface{}{"name": "potato"} + nameID := alice.SetState(t, roomID, "m.room.name", "", nameContent) + powerLevelState := map[string]interface{}{ + "users": map[string]int{alice.UserID: 100}, + "events_default": 10, + } + powerLevelID := alice.SetState(t, roomID, "m.room.power_levels", "", powerLevelState) + + t.Log("Alice logs out of her second device.") + alice.MustDoFunc(t, "POST", []string{"_matrix", "client", "v3", "logout"}) + + t.Log("Alice logs in again on her third device.") + alice.Login(t, "password", "device3") + + t.Log("Alice does an initial sliding sync.") + syncResp := alice.SlidingSync(t, + sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: [][2]int64{{0, 20}}, + }, + }, + }, + ) + + t.Log("She should see her latest state events in the response") + // TODO: Behind the scenes, this should have created a new poller which did a v2 + // initial sync. The v2 response should include the full state of the room, which + // we should relay in the sync response. Need to express this with a matcher. + m.MatchResponse(t, syncResp, m.LogResponse(t)) +} From 46059df3ed19994ddf5d11703dbac2bbd8499487 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 14 Apr 2023 20:22:51 +0100 Subject: [PATCH 03/39] WIP: update test cases --- tests-e2e/gappy_state_test.go | 50 ++++++++++++++++++++++---------- tests-integration/poller_test.go | 5 +--- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/tests-e2e/gappy_state_test.go b/tests-e2e/gappy_state_test.go index 7a211f8e..2641eb41 100644 --- a/tests-e2e/gappy_state_test.go +++ b/tests-e2e/gappy_state_test.go @@ -1,8 +1,10 @@ package syncv3_test import ( + "fmt" "github.com/matrix-org/sliding-sync/sync3" "github.com/matrix-org/sliding-sync/testutils/m" + "github.com/tidwall/gjson" "testing" ) @@ -28,22 +30,23 @@ func TestGappyState(t *testing.T) { t.Log("Alice logs in again on her second device.") alice.Login(t, "password", "device2") - t.Log("Alice sets two pieces of room state while the proxy isn't polling.") + t.Log("Alice changes the room name while the proxy isn't polling.") nameContent := map[string]interface{}{"name": "potato"} - nameID := alice.SetState(t, roomID, "m.room.name", "", nameContent) - powerLevelState := map[string]interface{}{ - "users": map[string]int{alice.UserID: 100}, - "events_default": 10, - } - powerLevelID := alice.SetState(t, roomID, "m.room.power_levels", "", powerLevelState) - - t.Log("Alice logs out of her second device.") - alice.MustDoFunc(t, "POST", []string{"_matrix", "client", "v3", "logout"}) + alice.SetState(t, roomID, "m.room.name", "", nameContent) - t.Log("Alice logs in again on her third device.") - alice.Login(t, "password", "device3") + t.Log("Alice sends lots of message events (more than the poller will request in a timeline.") + var latestMessageID string + for i := 0; i < 51; i++ { + latestMessageID = alice.SendEventUnsynced(t, roomID, Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": fmt.Sprintf("Message number %d", i), + }, + }) + } - t.Log("Alice does an initial sliding sync.") + t.Log("Alice requests an initial sliding sync on device 2.") syncResp := alice.SlidingSync(t, sync3.Request{ Lists: map[string]sync3.RequestList{ @@ -54,9 +57,24 @@ func TestGappyState(t *testing.T) { }, ) - t.Log("She should see her latest state events in the response") + t.Log("She should her latest message with the room updated") // TODO: Behind the scenes, this should have created a new poller which did a v2 // initial sync. The v2 response should include the full state of the room, which - // we should relay in the sync response. Need to express this with a matcher. - m.MatchResponse(t, syncResp, m.LogResponse(t)) + // we should relay in the sync response. + m.MatchResponse( + t, + syncResp, + m.LogResponse(t), + m.MatchRoomSubscription( + roomID, + m.MatchRoomName("potato"), + func(r sync3.Room) error { + lastReceivedEventID := gjson.ParseBytes(r.Timeline[len(r.Timeline)-1]).Get("event_id").Str + if lastReceivedEventID != latestMessageID { + return fmt.Errorf("last message in response is %s, expected %s", lastReceivedEventID, latestMessageID) + } + return nil + }, + ), + ) } diff --git a/tests-integration/poller_test.go b/tests-integration/poller_test.go index 5759b605..1c6098fc 100644 --- a/tests-integration/poller_test.go +++ b/tests-integration/poller_test.go @@ -167,10 +167,7 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) { m.MatchRoomSubscription( roomID, m.MatchRoomTimeline([]json.RawMessage{nameEvent, powerLevelsEvent, messageEvent}), + m.MatchRoomName("banana"), ), ) } - -func eventIDFromRawMessage(message json.RawMessage) string { - return gjson.ParseBytes(message).Get("event_id").Str -} From 419e1abeeeaf8e5730495595b03b126ab7c74d76 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 12:31:21 +0100 Subject: [PATCH 04/39] Update test case again --- tests-e2e/client_test.go | 1 + tests-e2e/gappy_state_test.go | 26 ++++++++++++++++++++------ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/tests-e2e/client_test.go b/tests-e2e/client_test.go index 7818889c..4411056b 100644 --- a/tests-e2e/client_test.go +++ b/tests-e2e/client_test.go @@ -251,6 +251,7 @@ func (c *CSAPI) SendEventUnsynced(t *testing.T, roomID string, e Event) string { } // SendEventSynced sends `e` into the room and waits for its event ID to come down /sync. +// NB: This is specifically v2 sync, not v3 sliding sync!! // Returns the event ID of the sent event. func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e Event) string { t.Helper() diff --git a/tests-e2e/gappy_state_test.go b/tests-e2e/gappy_state_test.go index 2641eb41..26637e95 100644 --- a/tests-e2e/gappy_state_test.go +++ b/tests-e2e/gappy_state_test.go @@ -13,7 +13,8 @@ func TestGappyState(t *testing.T) { alice := registerNewUser(t) t.Log("Alice creates a room") - roomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat"}) + firstRoomName := "Romeo Oscar Oscar Mike" + roomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat", "name": firstRoomName}) t.Log("Alice sends a message into that room") alice.SendEventSynced(t, roomID, Event{ @@ -24,6 +25,22 @@ func TestGappyState(t *testing.T) { }, }) + t.Log("Alice requests an initial sliding sync on device 1.") + syncResp := alice.SlidingSync(t, + sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: [][2]int64{{0, 20}}, + }, + }, + }, + ) + m.MatchResponse( + t, + syncResp, + m.MatchRoomSubscription(roomID, m.MatchRoomName(firstRoomName)), + ) + t.Log("Alice logs out of her first device.") alice.MustDoFunc(t, "POST", []string{"_matrix", "client", "v3", "logout"}) @@ -47,7 +64,7 @@ func TestGappyState(t *testing.T) { } t.Log("Alice requests an initial sliding sync on device 2.") - syncResp := alice.SlidingSync(t, + syncResp = alice.SlidingSync(t, sync3.Request{ Lists: map[string]sync3.RequestList{ "a": { @@ -57,10 +74,7 @@ func TestGappyState(t *testing.T) { }, ) - t.Log("She should her latest message with the room updated") - // TODO: Behind the scenes, this should have created a new poller which did a v2 - // initial sync. The v2 response should include the full state of the room, which - // we should relay in the sync response. + t.Log("She should her latest message with the room name updated") m.MatchResponse( t, syncResp, From 4b90454e92a973cae634c6311098ef4c2e9405c5 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 15:17:28 +0100 Subject: [PATCH 05/39] Don't reselect NIDs after initialising a room We fetch those NIDs above in `eventIDToNID` by adding a `RETURNING` clause to the `INSERT` statement. --- state/accumulator.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index db775777..c641a9a1 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -192,20 +192,13 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, membershipEventIDs[events[i].ID] = struct{}{} } } - idToNIDs, err := a.eventsTable.SelectNIDsByIDs(txn, eventIDs) - if err != nil { - return fmt.Errorf("failed to select NIDs for inserted events: %w", err) - } - if len(idToNIDs) != len(eventIDs) { - return fmt.Errorf("missing events just inserted, asked for %v got %v", eventIDs, idToNIDs) - } - memberNIDs := make([]int64, 0, len(idToNIDs)) - otherNIDs := make([]int64, 0, len(idToNIDs)) - for evID, nid := range idToNIDs { + memberNIDs := make([]int64, 0, len(eventIDToNID)) + otherNIDs := make([]int64, 0, len(eventIDToNID)) + for evID, nid := range eventIDToNID { if _, exists := membershipEventIDs[evID]; exists { - memberNIDs = append(memberNIDs, nid) + memberNIDs = append(memberNIDs, int64(nid)) } else { - otherNIDs = append(otherNIDs, nid) + otherNIDs = append(otherNIDs, int64(nid)) } } From a2cba6e59bdb583f09fca75801d16c97d99ed61c Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 15:35:03 +0100 Subject: [PATCH 06/39] We don't need `eventIDs` either --- state/accumulator.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index c641a9a1..e1e58c40 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -184,12 +184,10 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, } // pull out the event NIDs we just inserted - eventIDs := make([]string, len(events)) membershipEventIDs := make(map[string]struct{}, len(events)) - for i := range eventIDs { - eventIDs[i] = events[i].ID - if events[i].Type == "m.room.member" { - membershipEventIDs[events[i].ID] = struct{}{} + for _, event := range events { + if event.Type == "m.room.member" { + membershipEventIDs[event.ID] = struct{}{} } } memberNIDs := make([]int64, 0, len(eventIDToNID)) From 666823d21137eaad6d27cdb3613ce2ed0528719c Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 18:21:58 +0100 Subject: [PATCH 07/39] Introduce return struct for Initialise --- state/accumulator.go | 31 ++++++++++++++++++++----------- state/storage.go | 2 +- state/storage_test.go | 6 +++--- sync2/handler2/handler.go | 6 +++--- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index e1e58c40..731546eb 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -131,21 +131,30 @@ func (a *Accumulator) roomInfoDelta(roomID string, events []Event) RoomInfo { } } +type InitialiseResult struct { + // AddedEvents is true iff this call to Initialise added new state events to the DB. + AddedEvents bool + // SnapshotID is the ID of the snapshot which incorporates all added events. + // It has no meaning if AddedEvents is False. + SnapshotID int64 + // PrependTimelineEvents is empty if the room was not initialised prior to this call. + // Otherwise, it is an order-preserving subset of the `state` argument to Initialise + // containing all events that were not persisted prior to the Initialise call. These + // should be prepended to the room timeline by the caller. + PrependTimelineEvents []json.RawMessage +} + // Initialise starts a new sync accumulator for the given room using the given state as a baseline. // This will only take effect if this is the first time the v3 server has seen this room, and it wasn't -// possible to get all events up to the create event (e.g Matrix HQ). Returns true if this call actually -// added new events, along with the snapshot NID. -// +// possible to get all events up to the create event (e.g Matrix HQ). // This function: // - Stores these events // - Sets up the current snapshot based on the state list given. -func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, int64, error) { +func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res InitialiseResult, err error) { if len(state) == 0 { - return false, 0, nil + return res, nil } - addedEvents := false - var snapID int64 - err := sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error { + err = sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error { // Attempt to short-circuit. This has to be done inside a transaction to make sure // we don't race with multiple calls to Initialise with the same room ID. snapshotID, err := a.roomsTable.CurrentAfterSnapshotID(txn, roomID) @@ -210,7 +219,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, if err != nil { return fmt.Errorf("failed to insert snapshot: %w", err) } - addedEvents = true + res.AddedEvents = true latestNID := int64(0) for _, nid := range otherNIDs { if nid > latestNID { @@ -235,10 +244,10 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, // will have an associated state snapshot ID on the event. // Set the snapshot ID as the current state - snapID = snapshot.SnapshotID + res.snapID = snapshot.SnapshotID return a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID) }) - return addedEvents, snapID, err + return res, err } // Accumulate internal state from a user's sync response. The timeline order MUST be in the order diff --git a/state/storage.go b/state/storage.go index 23d5c03c..71b4b7db 100644 --- a/state/storage.go +++ b/state/storage.go @@ -309,7 +309,7 @@ func (s *Storage) Accumulate(roomID, prevBatch string, timeline []json.RawMessag return s.accumulator.Accumulate(roomID, prevBatch, timeline) } -func (s *Storage) Initialise(roomID string, state []json.RawMessage) (bool, int64, error) { +func (s *Storage) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) { return s.accumulator.Initialise(roomID, state) } diff --git a/state/storage_test.go b/state/storage_test.go index 8363cdd3..3888b3ba 100644 --- a/state/storage_test.go +++ b/state/storage_test.go @@ -282,7 +282,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) { }, } for roomID, eventMap := range roomIDToEventMap { - _, _, err := store.Initialise(roomID, eventMap) + _, err := store.Initialise(roomID, eventMap) if err != nil { t.Fatalf("Initialise on %s failed: %s", roomID, err) } @@ -512,7 +512,7 @@ func TestStorageLatestEventsInRoomsPrevBatch(t *testing.T) { }, } - _, _, err := store.Initialise(roomID, stateEvents) + _, err := store.Initialise(roomID, stateEvents) if err != nil { t.Fatalf("failed to initialise: %s", err) } @@ -616,7 +616,7 @@ func TestGlobalSnapshot(t *testing.T) { store := NewStorage(postgresConnectionString) defer store.Teardown() for roomID, stateEvents := range roomIDToEventMap { - _, _, err := store.Initialise(roomID, stateEvents) + _, err := store.Initialise(roomID, stateEvents) assertNoError(t, err) } snapshot, err := store.GlobalSnapshot() diff --git a/sync2/handler2/handler.go b/sync2/handler2/handler.go index 32010a86..030c6c4c 100644 --- a/sync2/handler2/handler.go +++ b/sync2/handler2/handler.go @@ -241,19 +241,19 @@ func (h *Handler) Accumulate(deviceID, roomID, prevBatch string, timeline []json } func (h *Handler) Initialise(roomID string, state []json.RawMessage) { - added, snapID, err := h.Store.Initialise(roomID, state) + res, err := h.Store.Initialise(roomID, state) if err != nil { logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room") sentry.CaptureException(err) return } - if !added { + if !res.AddedEvents { // no new events return } h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Initialise{ RoomID: roomID, - SnapshotNID: snapID, + SnapshotNID: res.SnapshotID, }) } From 4ba80d2b8390c4ab68115100eb19371a0a87abdd Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 19:13:12 +0100 Subject: [PATCH 08/39] Initialise: handle state blocks from a gappy sync --- state/accumulator.go | 121 +++++++++++++++++++++++++++++++++---------- state/event_table.go | 19 +++++++ 2 files changed, 114 insertions(+), 26 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index 731546eb..b911b19f 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -150,24 +150,31 @@ type InitialiseResult struct { // This function: // - Stores these events // - Sets up the current snapshot based on the state list given. -func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res InitialiseResult, err error) { +func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res InitialiseResult, outerErr error) { if len(state) == 0 { return res, nil } - err = sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error { - // Attempt to short-circuit. This has to be done inside a transaction to make sure + outerErr = sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error { + // This has to be done inside a transaction to make sure // we don't race with multiple calls to Initialise with the same room ID. snapshotID, err := a.roomsTable.CurrentAfterSnapshotID(txn, roomID) if err != nil { return fmt.Errorf("error fetching snapshot id for room %s: %s", roomID, err) } - if snapshotID > 0 { - // we only initialise rooms once - logger.Info().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called but current snapshot already exists, bailing early") - return nil + unknownRoom := snapshotID == 0 + if !unknownRoom { + const warningMsg = "Accumulator.Initialise called when current snapshot already exists. Patching in events" + logger.Warn().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg(warningMsg) + sentry.WithScope(func(scope *sentry.Scope) { + scope.SetContext("sliding-sync", map[string]interface{}{ + "room_id": roomID, + "snapshot_id": snapshotID, + }) + sentry.CaptureException(fmt.Errorf(warningMsg)) + }) } - // Insert the events + // Parse the events events := make([]Event, len(state)) for i := range events { events[i] = Event{ @@ -176,10 +183,45 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In IsState: true, } } - if err := ensureFieldsSet(events); err != nil { + if err = ensureFieldsSet(events); err != nil { return fmt.Errorf("events malformed: %s", err) } - eventIDToNID, err := a.eventsTable.Insert(txn, events, false) + + // Determine which events should be inserted. + var insertEvents []Event + if unknownRoom { + insertEvents = events + } else { + // Select the events which do not have a NID + eventIDs := make([]string, len(events)) + for i := range events { + eventIDs[i] = events[i].ID + } + unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs) + if err != nil { + return fmt.Errorf("error determing which event IDs are unknown") + } + if len(unknownEventIDs) == 0 { + // All events known. Odd, but nothing to do. + return nil + } + Outer: + for i := range events { + for j := range unknownEventIDs { + if events[i].ID == unknownEventIDs[j] { + insertEvents = append(insertEvents, events[i]) + continue Outer + } + } + } + } + if len(insertEvents) == 0 { + // All events known---nothing to do here. + return nil + } + + // Insert new events + eventIDToNID, err := a.eventsTable.Insert(txn, insertEvents, false) if err != nil { return fmt.Errorf("failed to insert events: %w", err) } @@ -192,24 +234,45 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In return nil } - // pull out the event NIDs we just inserted - membershipEventIDs := make(map[string]struct{}, len(events)) - for _, event := range events { - if event.Type == "m.room.member" { - membershipEventIDs[event.ID] = struct{}{} + // Determine the NIDs in the snapshot which includes the new state events + var memberNIDs, otherNIDs []int64 + if unknownRoom { + // Split the new NIDs into membership and nonmemberships. + membershipEventIDs := make(map[string]struct{}, len(events)) + for _, event := range events { + if event.Type == "m.room.member" { + membershipEventIDs[event.ID] = struct{}{} + } } - } - memberNIDs := make([]int64, 0, len(eventIDToNID)) - otherNIDs := make([]int64, 0, len(eventIDToNID)) - for evID, nid := range eventIDToNID { - if _, exists := membershipEventIDs[evID]; exists { - memberNIDs = append(memberNIDs, int64(nid)) - } else { - otherNIDs = append(otherNIDs, int64(nid)) + memberNIDs = make([]int64, 0, len(eventIDToNID)) + otherNIDs = make([]int64, 0, len(eventIDToNID)) + for evID, nid := range eventIDToNID { + if _, exists := membershipEventIDs[evID]; exists { + memberNIDs = append(memberNIDs, int64(nid)) + } else { + otherNIDs = append(otherNIDs, int64(nid)) + } + } + if err != nil { + return fmt.Errorf("failed to insert snapshot: %w", err) + } + } else { + // Update the existing snapshot, then extract NIDs. + stateEvents, err := a.strippedEventsForSnapshot(txn, snapshotID) + if err != nil { + return fmt.Errorf("failed to load stripped state events for snapshot %d: %s", snapshotID, err) } + var newStripped StrippedEvents + for _, ev := range insertEvents { + stateEvents, _, err = a.calculateNewSnapshot(stateEvents, ev) + if err != nil { + return fmt.Errorf("failed to calculateNewSnapshot: %s", err) + } + } + memberNIDs, otherNIDs = newStripped.NIDs() } - // Make a current snapshot + // Insert the new snapshot snapshot := &SnapshotRow{ RoomID: roomID, MembershipEvents: pq.Int64Array(memberNIDs), @@ -220,6 +283,12 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In return fmt.Errorf("failed to insert snapshot: %w", err) } res.AddedEvents = true + if !unknownRoom { + res.PrependTimelineEvents = make([]json.RawMessage, len(insertEvents)) + for i := range insertEvents { + res.PrependTimelineEvents[i] = insertEvents[i].JSON + } + } latestNID := int64(0) for _, nid := range otherNIDs { if nid > latestNID { @@ -244,10 +313,10 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In // will have an associated state snapshot ID on the event. // Set the snapshot ID as the current state - res.snapID = snapshot.SnapshotID + res.SnapshotID = snapshot.SnapshotID return a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID) }) - return res, err + return res, outerErr } // Accumulate internal state from a user's sync response. The timeline order MUST be in the order diff --git a/state/event_table.go b/state/event_table.go index 9ec11c62..5072a6a5 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -259,6 +259,25 @@ func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids } +// SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB. +// The order of event IDs in the return value is not guaranteed. +func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) ([]string, error) { + queryStr := ` + WITH maybe_unknown_events(event_id) AS (SELECT unnest($1::text[])) + SELECT event_id + FROM maybe_unknown_events LEFT JOIN syncv3_events USING(event_id) + WHERE event_nid IS NULL;` + + var unknownEventIDs []string + var err error + if txn != nil { + err = txn.Select(&unknownEventIDs, queryStr, maybeUnknownEventIDs) + } else { + err = t.db.Select(&unknownEventIDs, queryStr, maybeUnknownEventIDs) + } + return unknownEventIDs, err +} + // UpdateBeforeSnapshotID sets the before_state_snapshot_id field to `snapID` for the given NIDs. func (t *EventTable) UpdateBeforeSnapshotID(txn *sqlx.Tx, eventNID, snapID, replacesNID int64) error { _, err := txn.Exec( From 9fdb00129effddb478ac83b686bc203098cb6619 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 20:20:37 +0100 Subject: [PATCH 09/39] TODO comment --- state/accumulator.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/state/accumulator.go b/state/accumulator.go index b911b19f..3fedd693 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -163,6 +163,15 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In } unknownRoom := snapshotID == 0 if !unknownRoom { + // TODO: suppose Alice's poller has already initialised this room. Then Bob + // shows up and we make a brand new poller for him. When his poller initial + // syncs, the state block will be passed here. If that block includes a + // state event that Alice's poller hasn't seen, we'll now end up adding it + // to the DB here---without a snapshot ID---instead of in an Accumulate + // call (with a snapshot ID). + // + // Can we prevent this by passing in a bool initialPollerSync arg? + // (Return early if !initialPollerSync and !unknownRoom) const warningMsg = "Accumulator.Initialise called when current snapshot already exists. Patching in events" logger.Warn().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg(warningMsg) sentry.WithScope(func(scope *sentry.Scope) { From 33b174dd6779eb63f158711ada44867f6461b863 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 20:26:40 +0100 Subject: [PATCH 10/39] Fixup test code --- state/accumulator_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/state/accumulator_test.go b/state/accumulator_test.go index 53386461..2be71d93 100644 --- a/state/accumulator_test.go +++ b/state/accumulator_test.go @@ -23,11 +23,11 @@ func TestAccumulatorInitialise(t *testing.T) { db, close := connectToDB(t) defer close() accumulator := NewAccumulator(db) - added, initSnapID, err := accumulator.Initialise(roomID, roomEvents) + res, err := accumulator.Initialise(roomID, roomEvents) if err != nil { t.Fatalf("falied to Initialise accumulator: %s", err) } - if !added { + if !res.AddedEvents { t.Fatalf("didn't add events, wanted it to") } @@ -45,8 +45,8 @@ func TestAccumulatorInitialise(t *testing.T) { if snapID == 0 { t.Fatalf("Initialise did not store a current snapshot") } - if snapID != initSnapID { - t.Fatalf("Initialise returned wrong snapshot ID, got %v want %v", initSnapID, snapID) + if snapID != res.SnapshotID { + t.Fatalf("Initialise returned wrong snapshot ID, got %v want %v", res.SnapshotID, snapID) } // this snapshot should have 1 member event and 2 other events in it @@ -80,11 +80,11 @@ func TestAccumulatorInitialise(t *testing.T) { } // Subsequent calls do nothing and are not an error - added, _, err = accumulator.Initialise(roomID, roomEvents) + res, err = accumulator.Initialise(roomID, roomEvents) if err != nil { t.Fatalf("falied to Initialise accumulator: %s", err) } - if added { + if res.AddedEvents { t.Fatalf("added events when it shouldn't have") } } @@ -99,7 +99,7 @@ func TestAccumulatorAccumulate(t *testing.T) { db, close := connectToDB(t) defer close() accumulator := NewAccumulator(db) - _, _, err := accumulator.Initialise(roomID, roomEvents) + _, err := accumulator.Initialise(roomID, roomEvents) if err != nil { t.Fatalf("failed to Initialise accumulator: %s", err) } @@ -195,7 +195,7 @@ func TestAccumulatorDelta(t *testing.T) { db, close := connectToDB(t) defer close() accumulator := NewAccumulator(db) - _, _, err := accumulator.Initialise(roomID, nil) + _, err := accumulator.Initialise(roomID, nil) if err != nil { t.Fatalf("failed to Initialise accumulator: %s", err) } @@ -244,7 +244,7 @@ func TestAccumulatorMembershipLogs(t *testing.T) { db, close := connectToDB(t) defer close() accumulator := NewAccumulator(db) - _, _, err := accumulator.Initialise(roomID, nil) + _, err := accumulator.Initialise(roomID, nil) if err != nil { t.Fatalf("failed to Initialise accumulator: %s", err) } @@ -384,7 +384,7 @@ func TestAccumulatorDupeEvents(t *testing.T) { defer close() accumulator := NewAccumulator(db) roomID := "!buggy:localhost" - _, _, err := accumulator.Initialise(roomID, joinRoom.State.Events) + _, err := accumulator.Initialise(roomID, joinRoom.State.Events) if err != nil { t.Fatalf("failed to Initialise accumulator: %s", err) } @@ -426,7 +426,7 @@ func TestAccumulatorMisorderedGraceful(t *testing.T) { accumulator := NewAccumulator(db) roomID := "!TestAccumulatorStateReset:localhost" // Create a room with initial state A,C - _, _, err := accumulator.Initialise(roomID, []json.RawMessage{ + _, err := accumulator.Initialise(roomID, []json.RawMessage{ eventA, eventC, }) if err != nil { From ac9651ce7d16f053b8e2313801869f89e56c26db Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 20:43:23 +0100 Subject: [PATCH 11/39] Propagate err message --- state/accumulator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/state/accumulator.go b/state/accumulator.go index 3fedd693..56cc6040 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -208,7 +208,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In } unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs) if err != nil { - return fmt.Errorf("error determing which event IDs are unknown") + return fmt.Errorf("error determing which event IDs are unknown: %s", err) } if len(unknownEventIDs) == 0 { // All events known. Odd, but nothing to do. From 5d8560cd7c1e0deaa96d20c771d7f9167bca34b6 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 21:11:51 +0100 Subject: [PATCH 12/39] Fix db query --- state/event_table.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/state/event_table.go b/state/event_table.go index 5072a6a5..c712596c 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -271,9 +271,9 @@ func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs [] var unknownEventIDs []string var err error if txn != nil { - err = txn.Select(&unknownEventIDs, queryStr, maybeUnknownEventIDs) + err = txn.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs)) } else { - err = t.db.Select(&unknownEventIDs, queryStr, maybeUnknownEventIDs) + err = t.db.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs)) } return unknownEventIDs, err } From b7a8e7dd354d807f5bd6a4036fc1daec6694a3fb Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 21:12:08 +0100 Subject: [PATCH 13/39] Improve logging --- state/accumulator.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index 56cc6040..f9fbbe17 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -172,15 +172,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In // // Can we prevent this by passing in a bool initialPollerSync arg? // (Return early if !initialPollerSync and !unknownRoom) - const warningMsg = "Accumulator.Initialise called when current snapshot already exists. Patching in events" - logger.Warn().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg(warningMsg) - sentry.WithScope(func(scope *sentry.Scope) { - scope.SetContext("sliding-sync", map[string]interface{}{ - "room_id": roomID, - "snapshot_id": snapshotID, - }) - sentry.CaptureException(fmt.Errorf(warningMsg)) - }) + logger.Warn().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called when current snapshot already exists. May patch in events.") } // Parse the events @@ -229,6 +221,18 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In return nil } + if !unknownRoom { + logger.Warn().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Int("num_insert_events", len(insertEvents)).Msg("Accumulator.Initialise: patching in events") + sentry.WithScope(func(scope *sentry.Scope) { + scope.SetContext("sliding-sync", map[string]interface{}{ + "room_id": roomID, + "snapshot_id": snapshotID, + "num_insert_events": len(insertEvents), + }) + sentry.CaptureException(fmt.Errorf("Accumulator.Initialise: patching in events", len(insertEvents))) + }) + } + // Insert new events eventIDToNID, err := a.eventsTable.Insert(txn, insertEvents, false) if err != nil { From e3008e6f6a1c62bfc8d8db519ce4c261ceb4b5d5 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 21:44:55 +0100 Subject: [PATCH 14/39] Fix the logging --- state/accumulator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/state/accumulator.go b/state/accumulator.go index f9fbbe17..ee47d527 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -229,7 +229,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In "snapshot_id": snapshotID, "num_insert_events": len(insertEvents), }) - sentry.CaptureException(fmt.Errorf("Accumulator.Initialise: patching in events", len(insertEvents))) + sentry.CaptureException(fmt.Errorf("Accumulator.Initialise: patching in %d events", len(insertEvents))) }) } From 2406da5cf3ea8247d9fbe961ea857c9f57b1e555 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 17 Apr 2023 21:57:04 +0100 Subject: [PATCH 15/39] TODO note --- state/accumulator.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/state/accumulator.go b/state/accumulator.go index ee47d527..16211ca8 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -208,6 +208,8 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In } Outer: for i := range events { + // TODO: should this be a set of ids? Otherwise the double loop is + // quadratic in the size of the gap. for j := range unknownEventIDs { if events[i].ID == unknownEventIDs[j] { insertEvents = append(insertEvents, events[i]) From f28f7d0599a91d9f424eaee4e9b4c3c9f9ae9550 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 18 Apr 2023 14:55:58 +0100 Subject: [PATCH 16/39] Return set of unknown events from the db --- state/event_table.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/state/event_table.go b/state/event_table.go index c712596c..3be7d390 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -260,8 +260,10 @@ func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids } // SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB. -// The order of event IDs in the return value is not guaranteed. -func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) ([]string, error) { +func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) (map[string]struct{}, error) { + // Note: in practice, the order of rows returned matches the order of rows of + // array entries. But I don't think that's guaranteed. Return an (unordered) set + // out of paranoia. queryStr := ` WITH maybe_unknown_events(event_id) AS (SELECT unnest($1::text[])) SELECT event_id @@ -275,7 +277,14 @@ func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs [] } else { err = t.db.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs)) } - return unknownEventIDs, err + if err != nil { + return nil, err + } + unknownMap := make(map[string]struct{}, len(unknownEventIDs)) + for _, eventID := range unknownEventIDs { + unknownMap[eventID] = struct{}{} + } + return unknownMap, nil } // UpdateBeforeSnapshotID sets the before_state_snapshot_id field to `snapID` for the given NIDs. From 9d53a871d59c4e8b54bc915e5477a89d0928b454 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 18 Apr 2023 14:56:10 +0100 Subject: [PATCH 17/39] Simply Initialise to bail out early --- state/accumulator.go | 163 ++++++++++++++----------------------------- 1 file changed, 54 insertions(+), 109 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index 16211ca8..264b07ba 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -150,32 +150,53 @@ type InitialiseResult struct { // This function: // - Stores these events // - Sets up the current snapshot based on the state list given. -func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res InitialiseResult, outerErr error) { +func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) { + var res InitialiseResult if len(state) == 0 { return res, nil } - outerErr = sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error { - // This has to be done inside a transaction to make sure + err := sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error { + // Attempt to short-circuit. This has to be done inside a transaction to make sure // we don't race with multiple calls to Initialise with the same room ID. snapshotID, err := a.roomsTable.CurrentAfterSnapshotID(txn, roomID) if err != nil { return fmt.Errorf("error fetching snapshot id for room %s: %s", roomID, err) } - unknownRoom := snapshotID == 0 - if !unknownRoom { - // TODO: suppose Alice's poller has already initialised this room. Then Bob - // shows up and we make a brand new poller for him. When his poller initial - // syncs, the state block will be passed here. If that block includes a - // state event that Alice's poller hasn't seen, we'll now end up adding it - // to the DB here---without a snapshot ID---instead of in an Accumulate - // call (with a snapshot ID). - // - // Can we prevent this by passing in a bool initialPollerSync arg? - // (Return early if !initialPollerSync and !unknownRoom) - logger.Warn().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called when current snapshot already exists. May patch in events.") + if snapshotID > 0 { + // If the state block has a create event, it must be an initial sync. Ignore it. + fullState := false + for _, event := range state { + parsed := gjson.ParseBytes(event) + typeField := parsed.Get("type") + stateKeyField := parsed.Get("state_key") + if typeField.Str == "m.room.create" && stateKeyField.Exists() && stateKeyField.Str == "" { + fullState = true + } + } + if fullState { + logger.Info().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with full state but current snapshot already exists, bailing early") + } else { + // Otherwise, this is a diff as part of an incremental sync. Work out + // which state events (if any) we should prepend to the timeline. + logger.Warn().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with incremental state but current snapshot already exists.") + eventIDs := make([]string, len(state)) + for i := range state { + eventIDs[i] = gjson.ParseBytes(state[i]).Get("event_id").Str + } + unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs) + if err != nil { + return fmt.Errorf("error determing which event IDs are unknown: %s", err) + } + for i := range state { + if _, unknown := unknownEventIDs[eventIDs[i]]; unknown { + res.PrependTimelineEvents = append(res.PrependTimelineEvents, state[i]) + } + } + } + return nil } - // Parse the events + // Insert the events events := make([]Event, len(state)) for i := range events { events[i] = Event{ @@ -184,59 +205,10 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In IsState: true, } } - if err = ensureFieldsSet(events); err != nil { + if err := ensureFieldsSet(events); err != nil { return fmt.Errorf("events malformed: %s", err) } - - // Determine which events should be inserted. - var insertEvents []Event - if unknownRoom { - insertEvents = events - } else { - // Select the events which do not have a NID - eventIDs := make([]string, len(events)) - for i := range events { - eventIDs[i] = events[i].ID - } - unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs) - if err != nil { - return fmt.Errorf("error determing which event IDs are unknown: %s", err) - } - if len(unknownEventIDs) == 0 { - // All events known. Odd, but nothing to do. - return nil - } - Outer: - for i := range events { - // TODO: should this be a set of ids? Otherwise the double loop is - // quadratic in the size of the gap. - for j := range unknownEventIDs { - if events[i].ID == unknownEventIDs[j] { - insertEvents = append(insertEvents, events[i]) - continue Outer - } - } - } - } - if len(insertEvents) == 0 { - // All events known---nothing to do here. - return nil - } - - if !unknownRoom { - logger.Warn().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Int("num_insert_events", len(insertEvents)).Msg("Accumulator.Initialise: patching in events") - sentry.WithScope(func(scope *sentry.Scope) { - scope.SetContext("sliding-sync", map[string]interface{}{ - "room_id": roomID, - "snapshot_id": snapshotID, - "num_insert_events": len(insertEvents), - }) - sentry.CaptureException(fmt.Errorf("Accumulator.Initialise: patching in %d events", len(insertEvents))) - }) - } - - // Insert new events - eventIDToNID, err := a.eventsTable.Insert(txn, insertEvents, false) + eventIDToNID, err := a.eventsTable.Insert(txn, events, false) if err != nil { return fmt.Errorf("failed to insert events: %w", err) } @@ -249,45 +221,24 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In return nil } - // Determine the NIDs in the snapshot which includes the new state events - var memberNIDs, otherNIDs []int64 - if unknownRoom { - // Split the new NIDs into membership and nonmemberships. - membershipEventIDs := make(map[string]struct{}, len(events)) - for _, event := range events { - if event.Type == "m.room.member" { - membershipEventIDs[event.ID] = struct{}{} - } - } - memberNIDs = make([]int64, 0, len(eventIDToNID)) - otherNIDs = make([]int64, 0, len(eventIDToNID)) - for evID, nid := range eventIDToNID { - if _, exists := membershipEventIDs[evID]; exists { - memberNIDs = append(memberNIDs, int64(nid)) - } else { - otherNIDs = append(otherNIDs, int64(nid)) - } + // pull out the event NIDs we just inserted + membershipEventIDs := make(map[string]struct{}, len(events)) + for _, event := range events { + if event.Type == "m.room.member" { + membershipEventIDs[event.ID] = struct{}{} } - if err != nil { - return fmt.Errorf("failed to insert snapshot: %w", err) - } - } else { - // Update the existing snapshot, then extract NIDs. - stateEvents, err := a.strippedEventsForSnapshot(txn, snapshotID) - if err != nil { - return fmt.Errorf("failed to load stripped state events for snapshot %d: %s", snapshotID, err) - } - var newStripped StrippedEvents - for _, ev := range insertEvents { - stateEvents, _, err = a.calculateNewSnapshot(stateEvents, ev) - if err != nil { - return fmt.Errorf("failed to calculateNewSnapshot: %s", err) - } + } + memberNIDs := make([]int64, 0, len(eventIDToNID)) + otherNIDs := make([]int64, 0, len(eventIDToNID)) + for evID, nid := range eventIDToNID { + if _, exists := membershipEventIDs[evID]; exists { + memberNIDs = append(memberNIDs, int64(nid)) + } else { + otherNIDs = append(otherNIDs, int64(nid)) } - memberNIDs, otherNIDs = newStripped.NIDs() } - // Insert the new snapshot + // Make a current snapshot snapshot := &SnapshotRow{ RoomID: roomID, MembershipEvents: pq.Int64Array(memberNIDs), @@ -298,12 +249,6 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In return fmt.Errorf("failed to insert snapshot: %w", err) } res.AddedEvents = true - if !unknownRoom { - res.PrependTimelineEvents = make([]json.RawMessage, len(insertEvents)) - for i := range insertEvents { - res.PrependTimelineEvents[i] = insertEvents[i].JSON - } - } latestNID := int64(0) for _, nid := range otherNIDs { if nid > latestNID { @@ -331,7 +276,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In res.SnapshotID = snapshot.SnapshotID return a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID) }) - return res, outerErr + return res, err } // Accumulate internal state from a user's sync response. The timeline order MUST be in the order From 2bae78476db4f3ba1f66936deb5e1a3af11981da Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 18 Apr 2023 14:56:25 +0100 Subject: [PATCH 18/39] Propagate prepend events to poller, and inject --- sync2/handler2/handler.go | 17 ++++++++--------- sync2/poller.go | 28 ++++++++++++++++++++++++---- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/sync2/handler2/handler.go b/sync2/handler2/handler.go index 030c6c4c..df583bed 100644 --- a/sync2/handler2/handler.go +++ b/sync2/handler2/handler.go @@ -240,21 +240,20 @@ func (h *Handler) Accumulate(deviceID, roomID, prevBatch string, timeline []json }) } -func (h *Handler) Initialise(roomID string, state []json.RawMessage) { +func (h *Handler) Initialise(roomID string, state []json.RawMessage) []json.RawMessage { res, err := h.Store.Initialise(roomID, state) if err != nil { logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room") sentry.CaptureException(err) - return + return nil } - if !res.AddedEvents { - // no new events - return + if res.AddedEvents { + h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Initialise{ + RoomID: roomID, + SnapshotNID: res.SnapshotID, + }) } - h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Initialise{ - RoomID: roomID, - SnapshotNID: res.SnapshotID, - }) + return res.PrependTimelineEvents } func (h *Handler) SetTyping(roomID string, ephEvent json.RawMessage) { diff --git a/sync2/poller.go b/sync2/poller.go index ca4dc791..842b8fbd 100644 --- a/sync2/poller.go +++ b/sync2/poller.go @@ -3,6 +3,7 @@ package sync2 import ( "context" "encoding/json" + "github.com/getsentry/sentry-go" "sync" "sync/atomic" "time" @@ -23,7 +24,8 @@ type V2DataReceiver interface { // Accumulate data for this room. This means the timeline section of the v2 response. Accumulate(deviceID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries // Initialise the room, if it hasn't been already. This means the state section of the v2 response. - Initialise(roomID string, state []json.RawMessage) // snapshot ID? + // If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB. + Initialise(roomID string, state []json.RawMessage) []json.RawMessage // snapshot ID? // SetTyping indicates which users are typing. SetTyping(roomID string, ephEvent json.RawMessage) // Sent when there is a new receipt @@ -196,14 +198,15 @@ func (h *PollerMap) Accumulate(deviceID, roomID, prevBatch string, timeline []js } wg.Wait() } -func (h *PollerMap) Initialise(roomID string, state []json.RawMessage) { +func (h *PollerMap) Initialise(roomID string, state []json.RawMessage) (result []json.RawMessage) { var wg sync.WaitGroup wg.Add(1) h.executor <- func() { - h.callbacks.Initialise(roomID, state) + result = h.callbacks.Initialise(roomID, state) wg.Done() } wg.Wait() + return } func (h *PollerMap) SetTyping(roomID string, ephEvent json.RawMessage) { var wg sync.WaitGroup @@ -496,7 +499,24 @@ func (p *poller) parseRoomsResponse(res *SyncResponse) { for roomID, roomData := range res.Rooms.Join { if len(roomData.State.Events) > 0 { stateCalls++ - p.receiver.Initialise(roomID, roomData.State.Events) + prependStateEvents := p.receiver.Initialise(roomID, roomData.State.Events) + if len(prependStateEvents) > 0 { + // The poller has just learned of these state events due to an + // incremental poller sync; we must have missed the opportunity to see + // these down /sync in a timeline. As a workaround, inject these into + // the timeline now so that future events are received under the + // correct room state. + const warnMsg = "parseRoomsResponse: prepending state events to timeline after gappy poll" + log.Warn().Str("room_id", roomID).Int("prependStateEvents", len(prependStateEvents)).Msg(warnMsg) + sentry.WithScope(func(scope *sentry.Scope) { + scope.SetContext("sliding-sync", map[string]interface{}{ + "room_id": roomID, + "num_prepend_state_events": len(prependStateEvents), + }) + sentry.CaptureMessage(warnMsg) + }) + roomData.Timeline.Events = append(prependStateEvents, roomData.Timeline.Events...) + } } // process typing/receipts before events so we seed the caches correctly for when we return the room for _, ephEvent := range roomData.Ephemeral.Events { From 5621423295dfbcbf7f14961fb7f3a668008b2ce3 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 18 Apr 2023 15:16:42 +0100 Subject: [PATCH 19/39] Fix tests --- sync2/poller_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sync2/poller_test.go b/sync2/poller_test.go index 6b614220..f5bfb425 100644 --- a/sync2/poller_test.go +++ b/sync2/poller_test.go @@ -460,7 +460,7 @@ type mockDataReceiver struct { func (a *mockDataReceiver) Accumulate(userID, roomID, prevBatch string, timeline []json.RawMessage) { a.timelines[roomID] = append(a.timelines[roomID], timeline...) } -func (a *mockDataReceiver) Initialise(roomID string, state []json.RawMessage) { +func (a *mockDataReceiver) Initialise(roomID string, state []json.RawMessage) []json.RawMessage { a.states[roomID] = state if a.incomingProcess != nil { a.incomingProcess <- struct{}{} @@ -468,6 +468,9 @@ func (a *mockDataReceiver) Initialise(roomID string, state []json.RawMessage) { if a.unblockProcess != nil { <-a.unblockProcess } + // The return value is a list of unknown state events to be prepended to the room + // timeline. Untested here---return nil for now. + return nil } func (a *mockDataReceiver) SetTyping(roomID string, ephEvent json.RawMessage) { } From 9af0471db4d08b792e048fe6ac8cb9d17908a3af Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 18 Apr 2023 15:46:47 +0100 Subject: [PATCH 20/39] Always look for unknown state events --- state/accumulator.go | 51 +++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index 264b07ba..5fc8652e 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -145,11 +145,18 @@ type InitialiseResult struct { } // Initialise starts a new sync accumulator for the given room using the given state as a baseline. +// // This will only take effect if this is the first time the v3 server has seen this room, and it wasn't // possible to get all events up to the create event (e.g Matrix HQ). // This function: // - Stores these events // - Sets up the current snapshot based on the state list given. +// +// If the v3 server has seen this room before, this function +// - queries the DB to determine which state events are known to th server, +// - returns (via InitialiseResult.PrependTimelineEvents) a slice of unknown state events, +// +// and otherwise does nothing. func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) { var res InitialiseResult if len(state) == 0 { @@ -163,34 +170,24 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia return fmt.Errorf("error fetching snapshot id for room %s: %s", roomID, err) } if snapshotID > 0 { - // If the state block has a create event, it must be an initial sync. Ignore it. - fullState := false - for _, event := range state { - parsed := gjson.ParseBytes(event) - typeField := parsed.Get("type") - stateKeyField := parsed.Get("state_key") - if typeField.Str == "m.room.create" && stateKeyField.Exists() && stateKeyField.Str == "" { - fullState = true - } + // Poller A has received a gappy sync v2 response with a state block, and + // we have seen this room before. If we knew for certain that there is some + // other active poller B in this room then we could safely skip this logic. + + // Log at debug for now. If we find an unknown event, we'll return it so + // that the poller can log a warning. + logger.Debug().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with incremental state but current snapshot already exists.") + eventIDs := make([]string, len(state)) + for i := range state { + eventIDs[i] = gjson.ParseBytes(state[i]).Get("event_id").Str } - if fullState { - logger.Info().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with full state but current snapshot already exists, bailing early") - } else { - // Otherwise, this is a diff as part of an incremental sync. Work out - // which state events (if any) we should prepend to the timeline. - logger.Warn().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with incremental state but current snapshot already exists.") - eventIDs := make([]string, len(state)) - for i := range state { - eventIDs[i] = gjson.ParseBytes(state[i]).Get("event_id").Str - } - unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs) - if err != nil { - return fmt.Errorf("error determing which event IDs are unknown: %s", err) - } - for i := range state { - if _, unknown := unknownEventIDs[eventIDs[i]]; unknown { - res.PrependTimelineEvents = append(res.PrependTimelineEvents, state[i]) - } + unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs) + if err != nil { + return fmt.Errorf("error determing which event IDs are unknown: %s", err) + } + for i := range state { + if _, unknown := unknownEventIDs[eventIDs[i]]; unknown { + res.PrependTimelineEvents = append(res.PrependTimelineEvents, state[i]) } } return nil From b5893b1b3a8eaf6293a25c00f7217873a7128439 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 18 Apr 2023 15:51:10 +0100 Subject: [PATCH 21/39] Better error checking in test --- tests-e2e/gappy_state_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests-e2e/gappy_state_test.go b/tests-e2e/gappy_state_test.go index 26637e95..180021b8 100644 --- a/tests-e2e/gappy_state_test.go +++ b/tests-e2e/gappy_state_test.go @@ -83,6 +83,9 @@ func TestGappyState(t *testing.T) { roomID, m.MatchRoomName("potato"), func(r sync3.Room) error { + if len(r.Timeline) == 0 { + return fmt.Errorf("no timeline in response, expected at least one event") + } lastReceivedEventID := gjson.ParseBytes(r.Timeline[len(r.Timeline)-1]).Get("event_id").Str if lastReceivedEventID != latestMessageID { return fmt.Errorf("last message in response is %s, expected %s", lastReceivedEventID, latestMessageID) From aa07188b009fb4ce367d49310652ae0b1bc0cc6b Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 18 Apr 2023 18:33:29 +0100 Subject: [PATCH 22/39] Fixup test --- tests-e2e/gappy_state_test.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/tests-e2e/gappy_state_test.go b/tests-e2e/gappy_state_test.go index 180021b8..b76aaf20 100644 --- a/tests-e2e/gappy_state_test.go +++ b/tests-e2e/gappy_state_test.go @@ -8,6 +8,8 @@ import ( "testing" ) +// Test that state changes "missed" by a poller are injected back into the room when a +// future poller recieves a v2 incremental sync with a state block. func TestGappyState(t *testing.T) { t.Log("Alice registers on the homeserver.") alice := registerNewUser(t) @@ -17,7 +19,7 @@ func TestGappyState(t *testing.T) { roomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat", "name": firstRoomName}) t.Log("Alice sends a message into that room") - alice.SendEventSynced(t, roomID, Event{ + firstMessageID := alice.SendEventSynced(t, roomID, Event{ Type: "m.room.message", Content: map[string]interface{}{ "msgtype": "m.text", @@ -31,6 +33,9 @@ func TestGappyState(t *testing.T) { Lists: map[string]sync3.RequestList{ "a": { Ranges: [][2]int64{{0, 20}}, + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 10, + }, }, }, }, @@ -38,7 +43,20 @@ func TestGappyState(t *testing.T) { m.MatchResponse( t, syncResp, - m.MatchRoomSubscription(roomID, m.MatchRoomName(firstRoomName)), + m.MatchRoomSubscription( + roomID, + m.MatchRoomName(firstRoomName), + func(r sync3.Room) error { + if len(r.Timeline) == 0 { + return fmt.Errorf("no/empty timeline in response") + } + lastReceivedEventID := gjson.ParseBytes(r.Timeline[len(r.Timeline)-1]).Get("event_id").Str + if lastReceivedEventID == firstMessageID { + return nil + } + return fmt.Errorf("expected end of timeline to be %s but got %s", firstMessageID, lastReceivedEventID) + }, + ), ) t.Log("Alice logs out of her first device.") @@ -69,16 +87,18 @@ func TestGappyState(t *testing.T) { Lists: map[string]sync3.RequestList{ "a": { Ranges: [][2]int64{{0, 20}}, + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 10, + }, }, }, }, ) - t.Log("She should her latest message with the room name updated") + t.Log("She should see her latest message with the room name updated") m.MatchResponse( t, syncResp, - m.LogResponse(t), m.MatchRoomSubscription( roomID, m.MatchRoomName("potato"), From 7f03a3d78d4058a0f7b2580afa1fa57c24b79838 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 19 Apr 2023 13:05:01 +0100 Subject: [PATCH 23/39] Use test helper instead --- tests-e2e/gappy_state_test.go | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/tests-e2e/gappy_state_test.go b/tests-e2e/gappy_state_test.go index b76aaf20..a8004a12 100644 --- a/tests-e2e/gappy_state_test.go +++ b/tests-e2e/gappy_state_test.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/matrix-org/sliding-sync/sync3" "github.com/matrix-org/sliding-sync/testutils/m" - "github.com/tidwall/gjson" "testing" ) @@ -46,16 +45,7 @@ func TestGappyState(t *testing.T) { m.MatchRoomSubscription( roomID, m.MatchRoomName(firstRoomName), - func(r sync3.Room) error { - if len(r.Timeline) == 0 { - return fmt.Errorf("no/empty timeline in response") - } - lastReceivedEventID := gjson.ParseBytes(r.Timeline[len(r.Timeline)-1]).Get("event_id").Str - if lastReceivedEventID == firstMessageID { - return nil - } - return fmt.Errorf("expected end of timeline to be %s but got %s", firstMessageID, lastReceivedEventID) - }, + MatchRoomTimelineMostRecent(1, []Event{{ID: firstMessageID}}), ), ) @@ -102,16 +92,7 @@ func TestGappyState(t *testing.T) { m.MatchRoomSubscription( roomID, m.MatchRoomName("potato"), - func(r sync3.Room) error { - if len(r.Timeline) == 0 { - return fmt.Errorf("no timeline in response, expected at least one event") - } - lastReceivedEventID := gjson.ParseBytes(r.Timeline[len(r.Timeline)-1]).Get("event_id").Str - if lastReceivedEventID != latestMessageID { - return fmt.Errorf("last message in response is %s, expected %s", lastReceivedEventID, latestMessageID) - } - return nil - }, + MatchRoomTimelineMostRecent(1, []Event{{ID: latestMessageID}}), ), ) } From e6aac43c06feef21c0798ecb8c9669d419cb319c Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 19 Apr 2023 13:05:16 +0100 Subject: [PATCH 24/39] SelectUnknownEventIDs expects a txn --- state/event_table.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/state/event_table.go b/state/event_table.go index 3be7d390..d702dea6 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -260,6 +260,7 @@ func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids } // SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB. +// It MUST be called within a transaction, or else will panic. func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) (map[string]struct{}, error) { // Note: in practice, the order of rows returned matches the order of rows of // array entries. But I don't think that's guaranteed. Return an (unordered) set @@ -271,13 +272,7 @@ func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs [] WHERE event_nid IS NULL;` var unknownEventIDs []string - var err error - if txn != nil { - err = txn.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs)) - } else { - err = t.db.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs)) - } - if err != nil { + if err := txn.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs)); err != nil { return nil, err } unknownMap := make(map[string]struct{}, len(unknownEventIDs)) From b54ba7c01e39944d8176a9839a8747cee574aaab Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 19 Apr 2023 13:10:15 +0100 Subject: [PATCH 25/39] Poller test description --- tests-integration/poller_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests-integration/poller_test.go b/tests-integration/poller_test.go index 67ea6169..700bd1cb 100644 --- a/tests-integration/poller_test.go +++ b/tests-integration/poller_test.go @@ -113,7 +113,10 @@ func TestSecondPollerFiltersToDevice(t *testing.T) { m.MatchResponse(t, res, m.MatchToDeviceMessages([]json.RawMessage{wantMsg})) } -// TODO test description +// Test that the poller makes a best-effort attempt to integrate state seen in a +// v2 sync state block. Our strategy for doing so is to prepend any unknown state events +// to the start of the v2 sync response's timeline, which should then be visible to +// sync v3 clients as ordinary state events in the room timeline. func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) { pqString := testutils.PrepareDBConnectionString() v2 := runTestV2Server(t) From 5dd43155b2fe3bbb70ff6ab2598ecf618508fdd3 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 19 Apr 2023 14:33:15 +0100 Subject: [PATCH 26/39] Test new events table query --- state/accumulator.go | 2 +- state/event_table_test.go | 63 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/state/accumulator.go b/state/accumulator.go index 5fc8652e..11edeadb 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -181,7 +181,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia for i := range state { eventIDs[i] = gjson.ParseBytes(state[i]).Get("event_id").Str } - unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs) + unknownEventIDs, err := a.eventsTable.SelectUnknownEventIds(txn, roomID, eventIDs) if err != nil { return fmt.Errorf("error determing which event IDs are unknown: %s", err) } diff --git a/state/event_table_test.go b/state/event_table_test.go index 5e7c7e2c..b964282c 100644 --- a/state/event_table_test.go +++ b/state/event_table_test.go @@ -870,3 +870,66 @@ func TestRemoveUnsignedTXNID(t *testing.T) { } } } + +func TestSelectUnknownEventIds(t *testing.T) { + db, close := connectToDB(t) + defer close() + txn, err := db.Beginx() + if err != nil { + t.Fatalf("failed to start txn: %s", err) + } + defer txn.Rollback() + const roomID = "!1:localhost" + + knownEvents := []Event{ + { + Type: "m.room.create", + StateKey: "", + IsState: true, + ID: "$A", + RoomID: roomID, + }, + { + Type: "m.room.name", + StateKey: "", + IsState: true, + ID: "$B", + RoomID: roomID, + }, + } + table := NewEventTable(db) + + // Insert the events + _, err = table.Insert(txn, knownEvents, false) + if err != nil { + t.Errorf("failed to insert event: %s", err) + } + + gotEvents, err := table.SelectByIDs(txn, true, []string{"$A", "$B"}) + if err != nil { + t.Fatalf("failed to select events: %s", err) + } + if (gotEvents[0].ID == "$A" && gotEvents[1].ID == "$B") || (gotEvents[0].ID == "$B" && gotEvents[1].ID == "$A") { + t.Logf("Got expected event IDs after insert. NIDS: %s=%d, %s=%d", gotEvents[0].ID, gotEvents[0].NID, gotEvents[1].ID, gotEvents[1].NID) + } else { + t.Logf("Event ID mismatch: expected $A and $B, got %v", gotEvents) + } + + // Someone else tells us the state of the room is {A, C}. Query which of those + // event IDs are unknown. + stateBlockIDs := []string{"$A", "$C"} + unknownIDs, err := table.SelectUnknownEventIds(txn, roomID, stateBlockIDs) + t.Logf("unknownIDs=%v", unknownIDs) + if err != nil { + t.Errorf("failed to select unknown state events: %s", err) + } + + // Only event C should be flagged as unknown. + if len(unknownIDs) != 1 { + t.Fatalf("Expected 1 unknown id, got %v", unknownIDs) + } + _, ok := unknownIDs["$C"] + if !ok { + t.Fatalf("Expected $C to be unknown to the DB, but it wasn't") + } +} From 76066f96961ab1fe3e9ab4c58687d2dbd66f1300 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 19 Apr 2023 15:15:38 +0100 Subject: [PATCH 27/39] Tidy up accumulator event ID handling --- state/accumulator.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index 11edeadb..b0de435b 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -178,17 +178,21 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia // that the poller can log a warning. logger.Debug().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with incremental state but current snapshot already exists.") eventIDs := make([]string, len(state)) + eventIDToRawEvent := make(map[string]json.RawMessage, len(state)) for i := range state { - eventIDs[i] = gjson.ParseBytes(state[i]).Get("event_id").Str + eventID := gjson.ParseBytes(state[i]).Get("event_id") + if !eventID.Exists() || eventID.Type != gjson.String { + return fmt.Errorf("Event %d lacks an event ID", i) + } + eventIDToRawEvent[eventID.Str] = state[i] + eventIDs[i] = eventID.Str } - unknownEventIDs, err := a.eventsTable.SelectUnknownEventIds(txn, roomID, eventIDs) + unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs) if err != nil { return fmt.Errorf("error determing which event IDs are unknown: %s", err) } - for i := range state { - if _, unknown := unknownEventIDs[eventIDs[i]]; unknown { - res.PrependTimelineEvents = append(res.PrependTimelineEvents, state[i]) - } + for unknownEventID := range unknownEventIDs { + res.PrependTimelineEvents = append(res.PrependTimelineEvents, eventIDToRawEvent[unknownEventID]) } return nil } From 5a9fae1dea3583d58e1acea93887610e3ba50f82 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 19 Apr 2023 15:26:34 +0100 Subject: [PATCH 28/39] Fix capitalisation --- state/event_table_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/state/event_table_test.go b/state/event_table_test.go index b964282c..a4b30863 100644 --- a/state/event_table_test.go +++ b/state/event_table_test.go @@ -871,7 +871,7 @@ func TestRemoveUnsignedTXNID(t *testing.T) { } } -func TestSelectUnknownEventIds(t *testing.T) { +func TestSelectUnknownEventIDs(t *testing.T) { db, close := connectToDB(t) defer close() txn, err := db.Beginx() @@ -918,7 +918,7 @@ func TestSelectUnknownEventIds(t *testing.T) { // Someone else tells us the state of the room is {A, C}. Query which of those // event IDs are unknown. stateBlockIDs := []string{"$A", "$C"} - unknownIDs, err := table.SelectUnknownEventIds(txn, roomID, stateBlockIDs) + unknownIDs, err := table.SelectUnknownEventIDs(txn, roomID, stateBlockIDs) t.Logf("unknownIDs=%v", unknownIDs) if err != nil { t.Errorf("failed to select unknown state events: %s", err) From b32e5dacaaa691ca5f37f57a453d6f0a0ca1b810 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 19 Apr 2023 15:27:57 +0100 Subject: [PATCH 29/39] Fix test function args too Need to work out some kind of local linter here --- state/event_table_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/state/event_table_test.go b/state/event_table_test.go index a4b30863..d8d5c5ec 100644 --- a/state/event_table_test.go +++ b/state/event_table_test.go @@ -918,7 +918,7 @@ func TestSelectUnknownEventIDs(t *testing.T) { // Someone else tells us the state of the room is {A, C}. Query which of those // event IDs are unknown. stateBlockIDs := []string{"$A", "$C"} - unknownIDs, err := table.SelectUnknownEventIDs(txn, roomID, stateBlockIDs) + unknownIDs, err := table.SelectUnknownEventIDs(txn, stateBlockIDs) t.Logf("unknownIDs=%v", unknownIDs) if err != nil { t.Errorf("failed to select unknown state events: %s", err) From 00dd3963598a512e549bbe2478a578d634130947 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 19 Apr 2023 15:39:57 +0100 Subject: [PATCH 30/39] Fix MatchRoomTimelineMostRecent Suppose I build `MatchRoomTimelineMostRecent(1, []Event{C})`. this matcher was given a list `[A, B, C]` of three events. The matcher should allow that, but before this patch it wouldn't. I don't think this is the intention? --- tests-e2e/main_test.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tests-e2e/main_test.go b/tests-e2e/main_test.go index 601bf670..41610db7 100644 --- a/tests-e2e/main_test.go +++ b/tests-e2e/main_test.go @@ -79,10 +79,22 @@ func eventsEqual(wantList []Event, gotList []json.RawMessage) error { return nil } +// MatchRoomTimelineMostRecent builds a matcher which checks that the last `n` elements +// of `events` are the same as the last n elements of the room timeline. If either list +// contains fewer than `n` events, the match fails. +// Events are tested for equality using `eventsEqual`. func MatchRoomTimelineMostRecent(n int, events []Event) m.RoomMatcher { - subset := events[len(events)-n:] return func(r sync3.Room) error { - return MatchRoomTimeline(subset)(r) + if len(events) < n { + return fmt.Errorf("list of wanted events has %d events, expected at least %d", len(events), n) + } + wantList := events[len(events)-n:] + if len(r.Timeline) < n { + return fmt.Errorf("timeline has %d events, expected at least %d", len(r.Timeline), n) + } + + gotList := r.Timeline[len(r.Timeline)-n:] + return eventsEqual(wantList, gotList) } } From bea931d8f26f400691d3f0b4671099bc0a35b42b Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 19 Apr 2023 16:20:08 +0100 Subject: [PATCH 31/39] Fix running unit test alongside other tests Some txns are commited, so the tests aren't isolated. --- state/event_table_test.go | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/state/event_table_test.go b/state/event_table_test.go index d8d5c5ec..dce6922b 100644 --- a/state/event_table_test.go +++ b/state/event_table_test.go @@ -871,7 +871,7 @@ func TestRemoveUnsignedTXNID(t *testing.T) { } } -func TestSelectUnknownEventIDs(t *testing.T) { +func TestEventTableSelectUnknownEventIDs(t *testing.T) { db, close := connectToDB(t) defer close() txn, err := db.Beginx() @@ -881,55 +881,67 @@ func TestSelectUnknownEventIDs(t *testing.T) { defer txn.Rollback() const roomID = "!1:localhost" + // Note: there shouldn't be any other events with these IDs inserted before this + // transaction. $A and $B seem to be inserted and commit in TestEventTablePrevBatch. + const eventID1 = "$A-SelectUnknownEventIDs" + const eventID2 = "$B-SelectUnknownEventIDs" + knownEvents := []Event{ { Type: "m.room.create", StateKey: "", IsState: true, - ID: "$A", + ID: eventID1, RoomID: roomID, }, { Type: "m.room.name", StateKey: "", IsState: true, - ID: "$B", + ID: eventID2, RoomID: roomID, }, } table := NewEventTable(db) + // Check the event IDs haven't been added by another test. + gotEvents, err := table.SelectByIDs(txn, true, []string{eventID1, eventID2}) + if len(gotEvents) > 0 { + t.Fatalf("Event IDs already in use---commited by another test?") + } + // Insert the events _, err = table.Insert(txn, knownEvents, false) if err != nil { - t.Errorf("failed to insert event: %s", err) + t.Fatalf("failed to insert event: %s", err) } - gotEvents, err := table.SelectByIDs(txn, true, []string{"$A", "$B"}) + gotEvents, err = table.SelectByIDs(txn, true, []string{eventID1, eventID2}) if err != nil { t.Fatalf("failed to select events: %s", err) } - if (gotEvents[0].ID == "$A" && gotEvents[1].ID == "$B") || (gotEvents[0].ID == "$B" && gotEvents[1].ID == "$A") { + if (gotEvents[0].ID == eventID1 && gotEvents[1].ID == eventID2) || (gotEvents[0].ID == eventID2 && gotEvents[1].ID == eventID1) { t.Logf("Got expected event IDs after insert. NIDS: %s=%d, %s=%d", gotEvents[0].ID, gotEvents[0].NID, gotEvents[1].ID, gotEvents[1].NID) } else { - t.Logf("Event ID mismatch: expected $A and $B, got %v", gotEvents) + t.Fatalf("Event ID mismatch: expected $A-SelectUnknownEventIDs and $B-SelectUnknownEventIDs, got %v", gotEvents) } // Someone else tells us the state of the room is {A, C}. Query which of those // event IDs are unknown. - stateBlockIDs := []string{"$A", "$C"} + const unknownEventID = "$C-SelectUnknownEventIDs" + stateBlockIDs := []string{eventID1, unknownEventID} unknownIDs, err := table.SelectUnknownEventIDs(txn, stateBlockIDs) t.Logf("unknownIDs=%v", unknownIDs) if err != nil { - t.Errorf("failed to select unknown state events: %s", err) + t.Fatalf("failed to select unknown state events: %s", err) } // Only event C should be flagged as unknown. if len(unknownIDs) != 1 { t.Fatalf("Expected 1 unknown id, got %v", unknownIDs) } - _, ok := unknownIDs["$C"] + _, ok := unknownIDs[unknownEventID] if !ok { - t.Fatalf("Expected $C to be unknown to the DB, but it wasn't") + t.Fatalf("Expected $C-SelectUnknownEventIDs to be unknown to the DB, but it wasn't") } } From 168095232c781621a51a1bb30ed81cb0482f22cf Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 19 Apr 2023 17:27:38 +0100 Subject: [PATCH 32/39] Tidyup unit test --- tests-integration/poller_test.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tests-integration/poller_test.go b/tests-integration/poller_test.go index 700bd1cb..a69c563e 100644 --- a/tests-integration/poller_test.go +++ b/tests-integration/poller_test.go @@ -123,10 +123,9 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) { v3 := runTestServer(t, v2, pqString) defer v2.close() defer v3.close() - deviceAToken := "DEVICE_A_TOKEN" - v2.addAccount(alice, deviceAToken) + v2.addAccount(alice, aliceToken) const roomID = "!unimportant" - v2.queueResponse(deviceAToken, sync2.SyncResponse{ + v2.queueResponse(aliceToken, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, @@ -134,7 +133,7 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) { }), }, }) - res := v3.mustDoV3Request(t, deviceAToken, sync3.Request{ + res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{ "a": { Ranges: [][2]int64{{0, 20}}, @@ -142,7 +141,7 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) { }, }) - t.Log("The poller receives a gappy incremental sync response with a state block") + t.Log("The poller receives a gappy incremental sync response with a state block. The power levels and room name have changed.") nameEvent := testutils.NewStateEvent( t, "m.room.name", @@ -161,7 +160,7 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) { }, ) messageEvent := testutils.NewMessageEvent(t, alice, "hello") - v2.queueResponse(deviceAToken, sync2.SyncResponse{ + v2.queueResponse(aliceToken, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: map[string]sync2.SyncV2JoinResponse{ roomID: { @@ -178,7 +177,7 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) { }, }) - res = v3.mustDoV3RequestWithPos(t, deviceAToken, res.Pos, sync3.Request{}) + res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{}) m.MatchResponse( t, res, From 6e7d0cbee43edc5198dae561fe1614b6a4340480 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 19 Apr 2023 18:02:01 +0100 Subject: [PATCH 33/39] Test memberships are updated after gappy sync --- tests-integration/poller_test.go | 104 +++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/tests-integration/poller_test.go b/tests-integration/poller_test.go index a69c563e..e7946109 100644 --- a/tests-integration/poller_test.go +++ b/tests-integration/poller_test.go @@ -137,6 +137,9 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) { Lists: map[string]sync3.RequestList{ "a": { Ranges: [][2]int64{{0, 20}}, + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 10, + }, }, }, }) @@ -188,3 +191,104 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) { ), ) } + +// Similar to TestPollerHandlesUnknownStateEventsOnIncrementalSync. Here we are testing +// that if Alice's poller sees Bob leave in a state block, the events seen in that +// timeline are not visible to Bob. +func TestPollerUpdatesRoomMemberTrackerOnGappySyncStateBlock(t *testing.T) { + pqString := testutils.PrepareDBConnectionString() + v2 := runTestV2Server(t) + v3 := runTestServer(t, v2, pqString) + defer v2.close() + defer v3.close() + v2.addAccount(alice, aliceToken) + v2.addAccount(bob, bobToken) + const roomID = "!unimportant" + + t.Log("Alice's poller does an initial sync. It sees that Alice and Bob share a room.") + initialTimeline := createRoomState(t, alice, time.Now()) + bobJoin := testutils.NewStateEvent( + t, + "m.room.member", + bob, + bob, + map[string]interface{}{"membership": "join"}, + ) + initialJoinBlock := v2JoinTimeline(roomEvents{ + roomID: roomID, + events: append(initialTimeline, bobJoin), + }) + v2.queueResponse(aliceToken, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{Join: initialJoinBlock}, + }) + + t.Log("Alice makes an initial sliding sync request.") + aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: [][2]int64{{0, 20}}, + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 10, + }, + }, + }, + }) + + t.Log("Alice's poller receives a gappy incremental sync response. Bob has left in the gap. The timeline includes a message from Alice.") + bobLeave := testutils.NewStateEvent( + t, + "m.room.member", + bob, + bob, + map[string]interface{}{"membership": "leave"}, + ) + aliceMessage := testutils.NewMessageEvent(t, alice, "hello") + v2.queueResponse(aliceToken, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{ + Join: map[string]sync2.SyncV2JoinResponse{ + roomID: { + State: sync2.EventsResponse{ + Events: []json.RawMessage{bobLeave}, + }, + Timeline: sync2.TimelineResponse{ + Events: []json.RawMessage{aliceMessage}, + Limited: true, + PrevBatch: "batchymcbatchface", + }, + }, + }, + }, + }) + + t.Log("Alice makes an incremental sliding sync request.") + aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{}) + + t.Log("She should see Bob's leave event and her message at the end of the room timeline.") + m.MatchResponse( + t, + aliceRes, + m.MatchRoomSubscription( + roomID, + m.MatchRoomTimelineMostRecent(2, []json.RawMessage{bobLeave, aliceMessage}), + ), + ) + + t.Log("Bob makes an initial sliding sync request.") + bobRes := v3.mustDoV3Request(t, bobToken, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: [][2]int64{{0, 20}}, + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 10, + }, + }, + }, + }) + t.Log("He should not see himself in the room.") + m.MatchResponse( + t, + bobRes, + m.MatchList("a", m.MatchV3Count(0)), + ) + +} From aa09d12d058db0cdf156e59c0e0891bfe7f30566 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Sat, 22 Apr 2023 00:24:48 +0100 Subject: [PATCH 34/39] Check the new query reports multiple unknown IDs --- state/event_table_test.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/state/event_table_test.go b/state/event_table_test.go index dce6922b..f8b0d3ef 100644 --- a/state/event_table_test.go +++ b/state/event_table_test.go @@ -928,20 +928,21 @@ func TestEventTableSelectUnknownEventIDs(t *testing.T) { // Someone else tells us the state of the room is {A, C}. Query which of those // event IDs are unknown. - const unknownEventID = "$C-SelectUnknownEventIDs" - stateBlockIDs := []string{eventID1, unknownEventID} + shouldBeUnknownIDs := []string{"$C-SelectUnknownEventIDs", "$D-SelectUnknownEventIDs"} + stateBlockIDs := append(shouldBeUnknownIDs, eventID1) unknownIDs, err := table.SelectUnknownEventIDs(txn, stateBlockIDs) t.Logf("unknownIDs=%v", unknownIDs) if err != nil { t.Fatalf("failed to select unknown state events: %s", err) } - // Only event C should be flagged as unknown. - if len(unknownIDs) != 1 { - t.Fatalf("Expected 1 unknown id, got %v", unknownIDs) + // Event C and D should be flagged as unknown. + if len(unknownIDs) != len(shouldBeUnknownIDs) { + t.Fatalf("Expected %d unknown ids, got %v", len(shouldBeUnknownIDs), unknownIDs) } - _, ok := unknownIDs[unknownEventID] - if !ok { - t.Fatalf("Expected $C-SelectUnknownEventIDs to be unknown to the DB, but it wasn't") + for _, unknownEventID := range shouldBeUnknownIDs { + if _, ok := unknownIDs[unknownEventID]; !ok { + t.Errorf("Expected %s to be unknown to the DB, but it wasn't", unknownEventID) + } } } From 8324f6e65483663b3381f3e92de5220d10d25252 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Sat, 22 Apr 2023 00:36:39 +0100 Subject: [PATCH 35/39] SelectByIDs has an ordering guarantee --- state/event_table.go | 6 ++++++ state/event_table_test.go | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/state/event_table.go b/state/event_table.go index d702dea6..d21a3050 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -149,6 +149,8 @@ func (t *EventTable) SelectHighestNID() (highest int64, err error) { } // Insert events into the event table. Returns a map of event ID to NID for new events only. +// The NIDs assigned to new events will respect the order of the given events, e.g. if +// we insert new events A and B in that order, then NID(A) < NID(B). func (t *EventTable) Insert(txn *sqlx.Tx, events []Event, checkFields bool) (map[string]int, error) { if checkFields { ensureFieldsSet(events) @@ -211,6 +213,10 @@ func (t *EventTable) SelectByNIDs(txn *sqlx.Tx, verifyAll bool, nids []int64) (e WHERE event_nid = ANY ($1) ORDER BY event_nid ASC;`, pq.Int64Array(nids)) } +// SelectByIDs fetches all events with the given event IDs from the DB as Event structs. +// If verifyAll is true, the function will check that each event ID has a matching +// event row in the database. The returned events are ordered by ascending NID; the +// order of the event IDs is irrelevant. func (t *EventTable) SelectByIDs(txn *sqlx.Tx, verifyAll bool, ids []string) (events []Event, err error) { wanted := 0 if verifyAll { diff --git a/state/event_table_test.go b/state/event_table_test.go index f8b0d3ef..08eb0a0a 100644 --- a/state/event_table_test.go +++ b/state/event_table_test.go @@ -920,7 +920,7 @@ func TestEventTableSelectUnknownEventIDs(t *testing.T) { if err != nil { t.Fatalf("failed to select events: %s", err) } - if (gotEvents[0].ID == eventID1 && gotEvents[1].ID == eventID2) || (gotEvents[0].ID == eventID2 && gotEvents[1].ID == eventID1) { + if gotEvents[0].ID == eventID1 && gotEvents[1].ID == eventID2 { t.Logf("Got expected event IDs after insert. NIDS: %s=%d, %s=%d", gotEvents[0].ID, gotEvents[0].NID, gotEvents[1].ID, gotEvents[1].NID) } else { t.Fatalf("Event ID mismatch: expected $A-SelectUnknownEventIDs and $B-SelectUnknownEventIDs, got %v", gotEvents) From 6c9aa094ab10f4ddb066562841bf0f1f80773416 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 24 Apr 2023 12:04:23 +0100 Subject: [PATCH 36/39] Pass room ID to new query --- state/accumulator.go | 2 +- state/event_table.go | 8 ++++---- state/event_table_test.go | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index b0de435b..0a3e53aa 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -187,7 +187,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia eventIDToRawEvent[eventID.Str] = state[i] eventIDs[i] = eventID.Str } - unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs) + unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, roomID, eventIDs) if err != nil { return fmt.Errorf("error determing which event IDs are unknown: %s", err) } diff --git a/state/event_table.go b/state/event_table.go index d21a3050..3f35b6a0 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -267,18 +267,18 @@ func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids // SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB. // It MUST be called within a transaction, or else will panic. -func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) (map[string]struct{}, error) { +func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, roomID string, maybeUnknownEventIDs []string) (map[string]struct{}, error) { // Note: in practice, the order of rows returned matches the order of rows of // array entries. But I don't think that's guaranteed. Return an (unordered) set // out of paranoia. queryStr := ` - WITH maybe_unknown_events(event_id) AS (SELECT unnest($1::text[])) + WITH maybe_unknown_events(event_id) AS (SELECT unnest($2::text[])) SELECT event_id FROM maybe_unknown_events LEFT JOIN syncv3_events USING(event_id) - WHERE event_nid IS NULL;` + WHERE room_id = $1 AND event_nid IS NULL;` var unknownEventIDs []string - if err := txn.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs)); err != nil { + if err := txn.Select(&unknownEventIDs, queryStr, roomID, pq.StringArray(maybeUnknownEventIDs)); err != nil { return nil, err } unknownMap := make(map[string]struct{}, len(unknownEventIDs)) diff --git a/state/event_table_test.go b/state/event_table_test.go index 08eb0a0a..72f179a2 100644 --- a/state/event_table_test.go +++ b/state/event_table_test.go @@ -879,10 +879,10 @@ func TestEventTableSelectUnknownEventIDs(t *testing.T) { t.Fatalf("failed to start txn: %s", err) } defer txn.Rollback() - const roomID = "!1:localhost" + const roomID = "!1TestEventTableSelectUnknownEventIDs:localhost" // Note: there shouldn't be any other events with these IDs inserted before this - // transaction. $A and $B seem to be inserted and commit in TestEventTablePrevBatch. + // transaction. "$A" and "$B" seem to be inserted and commit in TestEventTablePrevBatch. const eventID1 = "$A-SelectUnknownEventIDs" const eventID2 = "$B-SelectUnknownEventIDs" @@ -930,7 +930,7 @@ func TestEventTableSelectUnknownEventIDs(t *testing.T) { // event IDs are unknown. shouldBeUnknownIDs := []string{"$C-SelectUnknownEventIDs", "$D-SelectUnknownEventIDs"} stateBlockIDs := append(shouldBeUnknownIDs, eventID1) - unknownIDs, err := table.SelectUnknownEventIDs(txn, stateBlockIDs) + unknownIDs, err := table.SelectUnknownEventIDs(txn, roomID, stateBlockIDs) t.Logf("unknownIDs=%v", unknownIDs) if err != nil { t.Fatalf("failed to select unknown state events: %s", err) From 275538487b93eaf94ef156de08aa04ec6a132afc Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 24 Apr 2023 12:45:11 +0100 Subject: [PATCH 37/39] Revert "Pass room ID to new query" This reverts commit 6c9aa094ab10f4ddb066562841bf0f1f80773416. --- state/accumulator.go | 2 +- state/event_table.go | 8 ++++---- state/event_table_test.go | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index 0a3e53aa..b0de435b 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -187,7 +187,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia eventIDToRawEvent[eventID.Str] = state[i] eventIDs[i] = eventID.Str } - unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, roomID, eventIDs) + unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs) if err != nil { return fmt.Errorf("error determing which event IDs are unknown: %s", err) } diff --git a/state/event_table.go b/state/event_table.go index 3f35b6a0..d21a3050 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -267,18 +267,18 @@ func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids // SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB. // It MUST be called within a transaction, or else will panic. -func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, roomID string, maybeUnknownEventIDs []string) (map[string]struct{}, error) { +func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) (map[string]struct{}, error) { // Note: in practice, the order of rows returned matches the order of rows of // array entries. But I don't think that's guaranteed. Return an (unordered) set // out of paranoia. queryStr := ` - WITH maybe_unknown_events(event_id) AS (SELECT unnest($2::text[])) + WITH maybe_unknown_events(event_id) AS (SELECT unnest($1::text[])) SELECT event_id FROM maybe_unknown_events LEFT JOIN syncv3_events USING(event_id) - WHERE room_id = $1 AND event_nid IS NULL;` + WHERE event_nid IS NULL;` var unknownEventIDs []string - if err := txn.Select(&unknownEventIDs, queryStr, roomID, pq.StringArray(maybeUnknownEventIDs)); err != nil { + if err := txn.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs)); err != nil { return nil, err } unknownMap := make(map[string]struct{}, len(unknownEventIDs)) diff --git a/state/event_table_test.go b/state/event_table_test.go index 72f179a2..08eb0a0a 100644 --- a/state/event_table_test.go +++ b/state/event_table_test.go @@ -879,10 +879,10 @@ func TestEventTableSelectUnknownEventIDs(t *testing.T) { t.Fatalf("failed to start txn: %s", err) } defer txn.Rollback() - const roomID = "!1TestEventTableSelectUnknownEventIDs:localhost" + const roomID = "!1:localhost" // Note: there shouldn't be any other events with these IDs inserted before this - // transaction. "$A" and "$B" seem to be inserted and commit in TestEventTablePrevBatch. + // transaction. $A and $B seem to be inserted and commit in TestEventTablePrevBatch. const eventID1 = "$A-SelectUnknownEventIDs" const eventID2 = "$B-SelectUnknownEventIDs" @@ -930,7 +930,7 @@ func TestEventTableSelectUnknownEventIDs(t *testing.T) { // event IDs are unknown. shouldBeUnknownIDs := []string{"$C-SelectUnknownEventIDs", "$D-SelectUnknownEventIDs"} stateBlockIDs := append(shouldBeUnknownIDs, eventID1) - unknownIDs, err := table.SelectUnknownEventIDs(txn, roomID, stateBlockIDs) + unknownIDs, err := table.SelectUnknownEventIDs(txn, stateBlockIDs) t.Logf("unknownIDs=%v", unknownIDs) if err != nil { t.Fatalf("failed to select unknown state events: %s", err) From 82d1d587b93107d0593d730f5c684a3bac7b90fb Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 24 Apr 2023 13:03:21 +0100 Subject: [PATCH 38/39] Update integration test --- tests-integration/poller_test.go | 69 ++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/tests-integration/poller_test.go b/tests-integration/poller_test.go index e7946109..10128b41 100644 --- a/tests-integration/poller_test.go +++ b/tests-integration/poller_test.go @@ -205,7 +205,7 @@ func TestPollerUpdatesRoomMemberTrackerOnGappySyncStateBlock(t *testing.T) { v2.addAccount(bob, bobToken) const roomID = "!unimportant" - t.Log("Alice's poller does an initial sync. It sees that Alice and Bob share a room.") + t.Log("Alice and Bob's pollers initial sync. Both see the same state: that Alice and Bob share a room.") initialTimeline := createRoomState(t, alice, time.Now()) bobJoin := testutils.NewStateEvent( t, @@ -221,9 +221,12 @@ func TestPollerUpdatesRoomMemberTrackerOnGappySyncStateBlock(t *testing.T) { v2.queueResponse(aliceToken, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{Join: initialJoinBlock}, }) + v2.queueResponse(aliceToken, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{Join: initialJoinBlock}, + }) t.Log("Alice makes an initial sliding sync request.") - aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{ + syncRequest := sync3.Request{ Lists: map[string]sync3.RequestList{ "a": { Ranges: [][2]int64{{0, 20}}, @@ -232,7 +235,35 @@ func TestPollerUpdatesRoomMemberTrackerOnGappySyncStateBlock(t *testing.T) { }, }, }, - }) + } + aliceRes := v3.mustDoV3Request(t, aliceToken, syncRequest) + + t.Log("Alice sees herself and Bob joined to the room.") + m.MatchResponse( + t, + aliceRes, + m.MatchList( + "a", + m.MatchV3Count(1), + m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{roomID})), + ), + m.MatchRoomSubscription(roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{bobJoin})), + ) + + t.Log("Bob makes an initial sliding sync request.") + bobRes := v3.mustDoV3Request(t, bobToken, syncRequest) + + t.Log("Bob sees himself and Alice joined to the room.") + m.MatchResponse( + t, + bobRes, + m.MatchList( + "a", + m.MatchV3Count(1), + m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{roomID})), + ), + m.MatchRoomSubscription(roomID, m.MatchJoinCount(2)), + ) t.Log("Alice's poller receives a gappy incremental sync response. Bob has left in the gap. The timeline includes a message from Alice.") bobLeave := testutils.NewStateEvent( @@ -260,35 +291,13 @@ func TestPollerUpdatesRoomMemberTrackerOnGappySyncStateBlock(t *testing.T) { }, }) - t.Log("Alice makes an incremental sliding sync request.") - aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{}) - - t.Log("She should see Bob's leave event and her message at the end of the room timeline.") - m.MatchResponse( - t, - aliceRes, - m.MatchRoomSubscription( - roomID, - m.MatchRoomTimelineMostRecent(2, []json.RawMessage{bobLeave, aliceMessage}), - ), - ) - - t.Log("Bob makes an initial sliding sync request.") - bobRes := v3.mustDoV3Request(t, bobToken, sync3.Request{ - Lists: map[string]sync3.RequestList{ - "a": { - Ranges: [][2]int64{{0, 20}}, - RoomSubscription: sync3.RoomSubscription{ - TimelineLimit: 10, - }, - }, - }, - }) - t.Log("He should not see himself in the room.") + t.Log("Bob makes an incremental sliding sync request.") + bobRes = v3.mustDoV3RequestWithPos(t, bobToken, bobRes.Pos, sync3.Request{}) + t.Log("He should see his leave event in the room timeline.") m.MatchResponse( t, bobRes, - m.MatchList("a", m.MatchV3Count(0)), + m.MatchList("a", m.MatchV3Count(1)), + m.MatchRoomSubscription(roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{bobLeave})), ) - } From 3274cf2daf2599f051ae45e87e4722dbac9b07cc Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 24 Apr 2023 17:43:49 +0100 Subject: [PATCH 39/39] typo Co-authored-by: kegsay --- tests-integration/poller_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests-integration/poller_test.go b/tests-integration/poller_test.go index 10128b41..d025dfe4 100644 --- a/tests-integration/poller_test.go +++ b/tests-integration/poller_test.go @@ -221,7 +221,7 @@ func TestPollerUpdatesRoomMemberTrackerOnGappySyncStateBlock(t *testing.T) { v2.queueResponse(aliceToken, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{Join: initialJoinBlock}, }) - v2.queueResponse(aliceToken, sync2.SyncResponse{ + v2.queueResponse(bobToken, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{Join: initialJoinBlock}, })