Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inject state blocks from incremental polls into the room timeline #71

Merged
merged 40 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
38c1bda
Integration test
Apr 13, 2023
1d59167
E2E test case draft
Apr 13, 2023
46059df
WIP: update test cases
Apr 14, 2023
419e1ab
Update test case again
Apr 17, 2023
4b90454
Don't reselect NIDs after initialising a room
Apr 17, 2023
a2cba6e
We don't need `eventIDs` either
Apr 17, 2023
c19b561
Merge branch 'dmr/dont-select-after-insert-returning' into dmr/gappy-…
Apr 17, 2023
666823d
Introduce return struct for Initialise
Apr 17, 2023
4ba80d2
Initialise: handle state blocks from a gappy sync
Apr 17, 2023
9fdb001
TODO comment
Apr 17, 2023
33b174d
Fixup test code
Apr 17, 2023
ac9651c
Propagate err message
Apr 17, 2023
5d8560c
Fix db query
Apr 17, 2023
b7a8e7d
Improve logging
Apr 17, 2023
e3008e6
Fix the logging
Apr 17, 2023
2406da5
TODO note
Apr 17, 2023
f28f7d0
Return set of unknown events from the db
Apr 18, 2023
9d53a87
Simply Initialise to bail out early
Apr 18, 2023
2bae784
Propagate prepend events to poller, and inject
Apr 18, 2023
5621423
Fix tests
Apr 18, 2023
9af0471
Always look for unknown state events
Apr 18, 2023
b5893b1
Better error checking in test
Apr 18, 2023
aa07188
Fixup test
Apr 18, 2023
7f03a3d
Use test helper instead
Apr 19, 2023
e6aac43
SelectUnknownEventIDs expects a txn
Apr 19, 2023
b54ba7c
Poller test description
Apr 19, 2023
5dd4315
Test new events table query
Apr 19, 2023
76066f9
Tidy up accumulator event ID handling
Apr 19, 2023
5a9fae1
Fix capitalisation
Apr 19, 2023
b32e5da
Fix test function args too
Apr 19, 2023
00dd396
Fix MatchRoomTimelineMostRecent
Apr 19, 2023
bea931d
Fix running unit test alongside other tests
Apr 19, 2023
1680952
Tidyup unit test
Apr 19, 2023
6e7d0cb
Test memberships are updated after gappy sync
Apr 19, 2023
aa09d12
Check the new query reports multiple unknown IDs
Apr 21, 2023
8324f6e
SelectByIDs has an ordering guarantee
Apr 21, 2023
6c9aa09
Pass room ID to new query
Apr 24, 2023
2755384
Revert "Pass room ID to new query"
Apr 24, 2023
82d1d58
Update integration test
Apr 24, 2023
3274cf2
typo
Apr 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 120 additions & 42 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,34 +131,59 @@ func (a *Accumulator) roomInfoDelta(roomID string, events []Event) RoomInfo {
}
}

type InitialiseResult struct {
// AddedEvents is true iff this call to Initialise added new state events to the DB.
AddedEvents bool
// SnapshotID is the ID of the snapshot which incorporates all added events.
// It has no meaning if AddedEvents is False.
SnapshotID int64
// PrependTimelineEvents is empty if the room was not initialised prior to this call.
// Otherwise, it is an order-preserving subset of the `state` argument to Initialise
// containing all events that were not persisted prior to the Initialise call. These
// should be prepended to the room timeline by the caller.
PrependTimelineEvents []json.RawMessage
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
}

// Initialise starts a new sync accumulator for the given room using the given state as a baseline.
// This will only take effect if this is the first time the v3 server has seen this room, and it wasn't
// possible to get all events up to the create event (e.g Matrix HQ). Returns true if this call actually
// added new events, along with the snapshot NID.
//
// possible to get all events up to the create event (e.g Matrix HQ).
// This function:
// - Stores these events
// - Sets up the current snapshot based on the state list given.
func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, int64, error) {
func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res InitialiseResult, outerErr error) {
if len(state) == 0 {
return false, 0, nil
return res, nil
}
addedEvents := false
var snapID int64
err := sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error {
// Attempt to short-circuit. This has to be done inside a transaction to make sure
outerErr = sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error {
// This has to be done inside a transaction to make sure
// we don't race with multiple calls to Initialise with the same room ID.
snapshotID, err := a.roomsTable.CurrentAfterSnapshotID(txn, roomID)
if err != nil {
return fmt.Errorf("error fetching snapshot id for room %s: %s", roomID, err)
}
if snapshotID > 0 {
// we only initialise rooms once
logger.Info().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called but current snapshot already exists, bailing early")
return nil
unknownRoom := snapshotID == 0
if !unknownRoom {
// TODO: suppose Alice's poller has already initialised this room. Then Bob
// shows up and we make a brand new poller for him. When his poller initial
// syncs, the state block will be passed here. If that block includes a
// state event that Alice's poller hasn't seen, we'll now end up adding it
// to the DB here---without a snapshot ID---instead of in an Accumulate
// call (with a snapshot ID).
//
// Can we prevent this by passing in a bool initialPollerSync arg?
// (Return early if !initialPollerSync and !unknownRoom)
const warningMsg = "Accumulator.Initialise called when current snapshot already exists. Patching in events"
logger.Warn().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg(warningMsg)
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetContext("sliding-sync", map[string]interface{}{
"room_id": roomID,
"snapshot_id": snapshotID,
})
sentry.CaptureException(fmt.Errorf(warningMsg))
})
}

// Insert the events
// Parse the events
events := make([]Event, len(state))
for i := range events {
events[i] = Event{
Expand All @@ -167,10 +192,45 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
IsState: true,
}
}
if err := ensureFieldsSet(events); err != nil {
if err = ensureFieldsSet(events); err != nil {
return fmt.Errorf("events malformed: %s", err)
}
eventIDToNID, err := a.eventsTable.Insert(txn, events, false)

// Determine which events should be inserted.
var insertEvents []Event
if unknownRoom {
insertEvents = events
} else {
// Select the events which do not have a NID
eventIDs := make([]string, len(events))
for i := range events {
eventIDs[i] = events[i].ID
}
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs)
if err != nil {
return fmt.Errorf("error determing which event IDs are unknown")
}
if len(unknownEventIDs) == 0 {
// All events known. Odd, but nothing to do.
return nil
}
Outer:
for i := range events {
for j := range unknownEventIDs {
if events[i].ID == unknownEventIDs[j] {
insertEvents = append(insertEvents, events[i])
continue Outer
}
}
}
}
if len(insertEvents) == 0 {
// All events known---nothing to do here.
return nil
}

// Insert new events
eventIDToNID, err := a.eventsTable.Insert(txn, insertEvents, false)
if err != nil {
return fmt.Errorf("failed to insert events: %w", err)
}
Expand All @@ -183,33 +243,45 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
return nil
}

// pull out the event NIDs we just inserted
eventIDs := make([]string, len(events))
membershipEventIDs := make(map[string]struct{}, len(events))
for i := range eventIDs {
eventIDs[i] = events[i].ID
if events[i].Type == "m.room.member" {
membershipEventIDs[events[i].ID] = struct{}{}
// Determine the NIDs in the snapshot which includes the new state events
var memberNIDs, otherNIDs []int64
if unknownRoom {
// Split the new NIDs into membership and nonmemberships.
membershipEventIDs := make(map[string]struct{}, len(events))
for _, event := range events {
if event.Type == "m.room.member" {
membershipEventIDs[event.ID] = struct{}{}
}
}
}
idToNIDs, err := a.eventsTable.SelectNIDsByIDs(txn, eventIDs)
if err != nil {
return fmt.Errorf("failed to select NIDs for inserted events: %w", err)
}
if len(idToNIDs) != len(eventIDs) {
return fmt.Errorf("missing events just inserted, asked for %v got %v", eventIDs, idToNIDs)
}
memberNIDs := make([]int64, 0, len(idToNIDs))
otherNIDs := make([]int64, 0, len(idToNIDs))
for evID, nid := range idToNIDs {
if _, exists := membershipEventIDs[evID]; exists {
memberNIDs = append(memberNIDs, nid)
} else {
otherNIDs = append(otherNIDs, nid)
memberNIDs = make([]int64, 0, len(eventIDToNID))
otherNIDs = make([]int64, 0, len(eventIDToNID))
for evID, nid := range eventIDToNID {
if _, exists := membershipEventIDs[evID]; exists {
memberNIDs = append(memberNIDs, int64(nid))
} else {
otherNIDs = append(otherNIDs, int64(nid))
}
}
if err != nil {
return fmt.Errorf("failed to insert snapshot: %w", err)
}
} else {
// Update the existing snapshot, then extract NIDs.
stateEvents, err := a.strippedEventsForSnapshot(txn, snapshotID)
if err != nil {
return fmt.Errorf("failed to load stripped state events for snapshot %d: %s", snapshotID, err)
}
var newStripped StrippedEvents
for _, ev := range insertEvents {
stateEvents, _, err = a.calculateNewSnapshot(stateEvents, ev)
if err != nil {
return fmt.Errorf("failed to calculateNewSnapshot: %s", err)
}
}
memberNIDs, otherNIDs = newStripped.NIDs()
}

// Make a current snapshot
// Insert the new snapshot
snapshot := &SnapshotRow{
RoomID: roomID,
MembershipEvents: pq.Int64Array(memberNIDs),
Expand All @@ -219,7 +291,13 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
if err != nil {
return fmt.Errorf("failed to insert snapshot: %w", err)
}
addedEvents = true
res.AddedEvents = true
if !unknownRoom {
res.PrependTimelineEvents = make([]json.RawMessage, len(insertEvents))
for i := range insertEvents {
res.PrependTimelineEvents[i] = insertEvents[i].JSON
}
}
latestNID := int64(0)
for _, nid := range otherNIDs {
if nid > latestNID {
Expand All @@ -244,10 +322,10 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
// will have an associated state snapshot ID on the event.

// Set the snapshot ID as the current state
snapID = snapshot.SnapshotID
res.SnapshotID = snapshot.SnapshotID
return a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID)
})
return addedEvents, snapID, err
return res, outerErr
}

// Accumulate internal state from a user's sync response. The timeline order MUST be in the order
Expand Down
22 changes: 11 additions & 11 deletions state/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ func TestAccumulatorInitialise(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
added, initSnapID, err := accumulator.Initialise(roomID, roomEvents)
res, err := accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("falied to Initialise accumulator: %s", err)
}
if !added {
if !res.AddedEvents {
t.Fatalf("didn't add events, wanted it to")
}

Expand All @@ -45,8 +45,8 @@ func TestAccumulatorInitialise(t *testing.T) {
if snapID == 0 {
t.Fatalf("Initialise did not store a current snapshot")
}
if snapID != initSnapID {
t.Fatalf("Initialise returned wrong snapshot ID, got %v want %v", initSnapID, snapID)
if snapID != res.SnapshotID {
t.Fatalf("Initialise returned wrong snapshot ID, got %v want %v", res.SnapshotID, snapID)
}

// this snapshot should have 1 member event and 2 other events in it
Expand Down Expand Up @@ -80,11 +80,11 @@ func TestAccumulatorInitialise(t *testing.T) {
}

// Subsequent calls do nothing and are not an error
added, _, err = accumulator.Initialise(roomID, roomEvents)
res, err = accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("falied to Initialise accumulator: %s", err)
}
if added {
if res.AddedEvents {
t.Fatalf("added events when it shouldn't have")
}
}
Expand All @@ -99,7 +99,7 @@ func TestAccumulatorAccumulate(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, roomEvents)
_, err := accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestAccumulatorDelta(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, nil)
_, err := accumulator.Initialise(roomID, nil)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestAccumulatorMembershipLogs(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, nil)
_, err := accumulator.Initialise(roomID, nil)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestAccumulatorDupeEvents(t *testing.T) {
defer close()
accumulator := NewAccumulator(db)
roomID := "!buggy:localhost"
_, _, err := accumulator.Initialise(roomID, joinRoom.State.Events)
_, err := accumulator.Initialise(roomID, joinRoom.State.Events)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestAccumulatorMisorderedGraceful(t *testing.T) {
accumulator := NewAccumulator(db)
roomID := "!TestAccumulatorStateReset:localhost"
// Create a room with initial state A,C
_, _, err := accumulator.Initialise(roomID, []json.RawMessage{
_, err := accumulator.Initialise(roomID, []json.RawMessage{
eventA, eventC,
})
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions state/event_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,25 @@ func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids

}

// SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB.
// The order of event IDs in the return value is not guaranteed.
func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) ([]string, error) {
queryStr := `
WITH maybe_unknown_events(event_id) AS (SELECT unnest($1::text[]))
SELECT event_id
FROM maybe_unknown_events LEFT JOIN syncv3_events USING(event_id)
WHERE event_nid IS NULL;`
Comment on lines +275 to +278
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did test this in a postgres playground, but I think this might be worth a unittest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do. There is event_table_test.go to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use this style of query anywhere in the proxy atm. I'm guessing you do this so you can "select then negate" all on the DB side? I'd expected to see something more like:

SELECT event_id FROM syncv3_events WHERE room_id=$1 AND event_id=ANY($2)

but then you'd need to negate on the Go side to find the missing ones. I wonder which is better.

Copy link
Contributor Author

@DMRobertson DMRobertson Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the thinking was basically to get the DB to tell me which events are unknown, rather than getting it to tell me which events are known and then computing the complement. (I'm assuming that we know about most state events, so we may as well get the DB to transmit the smaller set back to us.)

I don't think it makes a huge difference if the state block and room are small. I'm slightly worried about hitting this query for Matrix HQ with state block from a new poller's initial sync.


var unknownEventIDs []string
var err error
if txn != nil {
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
err = txn.Select(&unknownEventIDs, queryStr, maybeUnknownEventIDs)
} else {
err = t.db.Select(&unknownEventIDs, queryStr, maybeUnknownEventIDs)
}
return unknownEventIDs, err
}

// UpdateBeforeSnapshotID sets the before_state_snapshot_id field to `snapID` for the given NIDs.
func (t *EventTable) UpdateBeforeSnapshotID(txn *sqlx.Tx, eventNID, snapID, replacesNID int64) error {
_, err := txn.Exec(
Expand Down
2 changes: 1 addition & 1 deletion state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (s *Storage) Accumulate(roomID, prevBatch string, timeline []json.RawMessag
return s.accumulator.Accumulate(roomID, prevBatch, timeline)
}

func (s *Storage) Initialise(roomID string, state []json.RawMessage) (bool, int64, error) {
func (s *Storage) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) {
return s.accumulator.Initialise(roomID, state)
}

Expand Down
6 changes: 3 additions & 3 deletions state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
},
}
for roomID, eventMap := range roomIDToEventMap {
_, _, err := store.Initialise(roomID, eventMap)
_, err := store.Initialise(roomID, eventMap)
if err != nil {
t.Fatalf("Initialise on %s failed: %s", roomID, err)
}
Expand Down Expand Up @@ -512,7 +512,7 @@ func TestStorageLatestEventsInRoomsPrevBatch(t *testing.T) {
},
}

_, _, err := store.Initialise(roomID, stateEvents)
_, err := store.Initialise(roomID, stateEvents)
if err != nil {
t.Fatalf("failed to initialise: %s", err)
}
Expand Down Expand Up @@ -616,7 +616,7 @@ func TestGlobalSnapshot(t *testing.T) {
store := NewStorage(postgresConnectionString)
defer store.Teardown()
for roomID, stateEvents := range roomIDToEventMap {
_, _, err := store.Initialise(roomID, stateEvents)
_, err := store.Initialise(roomID, stateEvents)
assertNoError(t, err)
}
snapshot, err := store.GlobalSnapshot()
Expand Down
6 changes: 3 additions & 3 deletions sync2/handler2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,19 +241,19 @@ func (h *Handler) Accumulate(deviceID, roomID, prevBatch string, timeline []json
}

func (h *Handler) Initialise(roomID string, state []json.RawMessage) {
added, snapID, err := h.Store.Initialise(roomID, state)
res, err := h.Store.Initialise(roomID, state)
if err != nil {
logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room")
sentry.CaptureException(err)
return
}
if !added {
if !res.AddedEvents {
// no new events
return
}
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Initialise{
RoomID: roomID,
SnapshotNID: snapID,
SnapshotNID: res.SnapshotID,
})
}

Expand Down
Loading