Skip to content

Commit

Permalink
Merge pull request #71 from matrix-org/dmr/gappy-state
Browse files Browse the repository at this point in the history
Inject state blocks from incremental polls into the room timeline
  • Loading branch information
David Robertson authored Apr 24, 2023
2 parents 6d0fb2b + 3274cf2 commit 206ee81
Show file tree
Hide file tree
Showing 14 changed files with 588 additions and 64 deletions.
88 changes: 59 additions & 29 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,20 +131,37 @@ func (a *Accumulator) roomInfoDelta(roomID string, events []Event) RoomInfo {
}
}

type InitialiseResult struct {
// AddedEvents is true iff this call to Initialise added new state events to the DB.
AddedEvents bool
// SnapshotID is the ID of the snapshot which incorporates all added events.
// It has no meaning if AddedEvents is False.
SnapshotID int64
// PrependTimelineEvents is empty if the room was not initialised prior to this call.
// Otherwise, it is an order-preserving subset of the `state` argument to Initialise
// containing all events that were not persisted prior to the Initialise call. These
// should be prepended to the room timeline by the caller.
PrependTimelineEvents []json.RawMessage
}

// Initialise starts a new sync accumulator for the given room using the given state as a baseline.
// This will only take effect if this is the first time the v3 server has seen this room, and it wasn't
// possible to get all events up to the create event (e.g Matrix HQ). Returns true if this call actually
// added new events, along with the snapshot NID.
//
// This will only take effect if this is the first time the v3 server has seen this room, and it wasn't
// possible to get all events up to the create event (e.g Matrix HQ).
// This function:
// - Stores these events
// - Sets up the current snapshot based on the state list given.
func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, int64, error) {
//
// If the v3 server has seen this room before, this function
// - queries the DB to determine which state events are known to th server,
// - returns (via InitialiseResult.PrependTimelineEvents) a slice of unknown state events,
//
// and otherwise does nothing.
func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) {
var res InitialiseResult
if len(state) == 0 {
return false, 0, nil
return res, nil
}
addedEvents := false
var snapID int64
err := sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error {
// Attempt to short-circuit. This has to be done inside a transaction to make sure
// we don't race with multiple calls to Initialise with the same room ID.
Expand All @@ -153,8 +170,30 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
return fmt.Errorf("error fetching snapshot id for room %s: %s", roomID, err)
}
if snapshotID > 0 {
// we only initialise rooms once
logger.Info().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called but current snapshot already exists, bailing early")
// Poller A has received a gappy sync v2 response with a state block, and
// we have seen this room before. If we knew for certain that there is some
// other active poller B in this room then we could safely skip this logic.

// Log at debug for now. If we find an unknown event, we'll return it so
// that the poller can log a warning.
logger.Debug().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with incremental state but current snapshot already exists.")
eventIDs := make([]string, len(state))
eventIDToRawEvent := make(map[string]json.RawMessage, len(state))
for i := range state {
eventID := gjson.ParseBytes(state[i]).Get("event_id")
if !eventID.Exists() || eventID.Type != gjson.String {
return fmt.Errorf("Event %d lacks an event ID", i)
}
eventIDToRawEvent[eventID.Str] = state[i]
eventIDs[i] = eventID.Str
}
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs)
if err != nil {
return fmt.Errorf("error determing which event IDs are unknown: %s", err)
}
for unknownEventID := range unknownEventIDs {
res.PrependTimelineEvents = append(res.PrependTimelineEvents, eventIDToRawEvent[unknownEventID])
}
return nil
}

Expand Down Expand Up @@ -184,28 +223,19 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
}

// pull out the event NIDs we just inserted
eventIDs := make([]string, len(events))
membershipEventIDs := make(map[string]struct{}, len(events))
for i := range eventIDs {
eventIDs[i] = events[i].ID
if events[i].Type == "m.room.member" {
membershipEventIDs[events[i].ID] = struct{}{}
for _, event := range events {
if event.Type == "m.room.member" {
membershipEventIDs[event.ID] = struct{}{}
}
}
idToNIDs, err := a.eventsTable.SelectNIDsByIDs(txn, eventIDs)
if err != nil {
return fmt.Errorf("failed to select NIDs for inserted events: %w", err)
}
if len(idToNIDs) != len(eventIDs) {
return fmt.Errorf("missing events just inserted, asked for %v got %v", eventIDs, idToNIDs)
}
memberNIDs := make([]int64, 0, len(idToNIDs))
otherNIDs := make([]int64, 0, len(idToNIDs))
for evID, nid := range idToNIDs {
memberNIDs := make([]int64, 0, len(eventIDToNID))
otherNIDs := make([]int64, 0, len(eventIDToNID))
for evID, nid := range eventIDToNID {
if _, exists := membershipEventIDs[evID]; exists {
memberNIDs = append(memberNIDs, nid)
memberNIDs = append(memberNIDs, int64(nid))
} else {
otherNIDs = append(otherNIDs, nid)
otherNIDs = append(otherNIDs, int64(nid))
}
}

Expand All @@ -219,7 +249,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
if err != nil {
return fmt.Errorf("failed to insert snapshot: %w", err)
}
addedEvents = true
res.AddedEvents = true
latestNID := int64(0)
for _, nid := range otherNIDs {
if nid > latestNID {
Expand All @@ -244,10 +274,10 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
// will have an associated state snapshot ID on the event.

// Set the snapshot ID as the current state
snapID = snapshot.SnapshotID
res.SnapshotID = snapshot.SnapshotID
return a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID)
})
return addedEvents, snapID, err
return res, err
}

// Accumulate internal state from a user's sync response. The timeline order MUST be in the order
Expand Down
22 changes: 11 additions & 11 deletions state/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ func TestAccumulatorInitialise(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
added, initSnapID, err := accumulator.Initialise(roomID, roomEvents)
res, err := accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("falied to Initialise accumulator: %s", err)
}
if !added {
if !res.AddedEvents {
t.Fatalf("didn't add events, wanted it to")
}

Expand All @@ -45,8 +45,8 @@ func TestAccumulatorInitialise(t *testing.T) {
if snapID == 0 {
t.Fatalf("Initialise did not store a current snapshot")
}
if snapID != initSnapID {
t.Fatalf("Initialise returned wrong snapshot ID, got %v want %v", initSnapID, snapID)
if snapID != res.SnapshotID {
t.Fatalf("Initialise returned wrong snapshot ID, got %v want %v", res.SnapshotID, snapID)
}

// this snapshot should have 1 member event and 2 other events in it
Expand Down Expand Up @@ -80,11 +80,11 @@ func TestAccumulatorInitialise(t *testing.T) {
}

// Subsequent calls do nothing and are not an error
added, _, err = accumulator.Initialise(roomID, roomEvents)
res, err = accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("falied to Initialise accumulator: %s", err)
}
if added {
if res.AddedEvents {
t.Fatalf("added events when it shouldn't have")
}
}
Expand All @@ -99,7 +99,7 @@ func TestAccumulatorAccumulate(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, roomEvents)
_, err := accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestAccumulatorDelta(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, nil)
_, err := accumulator.Initialise(roomID, nil)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestAccumulatorMembershipLogs(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, nil)
_, err := accumulator.Initialise(roomID, nil)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestAccumulatorDupeEvents(t *testing.T) {
defer close()
accumulator := NewAccumulator(db)
roomID := "!buggy:localhost"
_, _, err := accumulator.Initialise(roomID, joinRoom.State.Events)
_, err := accumulator.Initialise(roomID, joinRoom.State.Events)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestAccumulatorMisorderedGraceful(t *testing.T) {
accumulator := NewAccumulator(db)
roomID := "!TestAccumulatorStateReset:localhost"
// Create a room with initial state A,C
_, _, err := accumulator.Initialise(roomID, []json.RawMessage{
_, err := accumulator.Initialise(roomID, []json.RawMessage{
eventA, eventC,
})
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions state/event_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func (t *EventTable) SelectHighestNID() (highest int64, err error) {
}

// Insert events into the event table. Returns a map of event ID to NID for new events only.
// The NIDs assigned to new events will respect the order of the given events, e.g. if
// we insert new events A and B in that order, then NID(A) < NID(B).
func (t *EventTable) Insert(txn *sqlx.Tx, events []Event, checkFields bool) (map[string]int, error) {
if checkFields {
ensureFieldsSet(events)
Expand Down Expand Up @@ -211,6 +213,10 @@ func (t *EventTable) SelectByNIDs(txn *sqlx.Tx, verifyAll bool, nids []int64) (e
WHERE event_nid = ANY ($1) ORDER BY event_nid ASC;`, pq.Int64Array(nids))
}

// SelectByIDs fetches all events with the given event IDs from the DB as Event structs.
// If verifyAll is true, the function will check that each event ID has a matching
// event row in the database. The returned events are ordered by ascending NID; the
// order of the event IDs is irrelevant.
func (t *EventTable) SelectByIDs(txn *sqlx.Tx, verifyAll bool, ids []string) (events []Event, err error) {
wanted := 0
if verifyAll {
Expand Down Expand Up @@ -259,6 +265,29 @@ func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids

}

// SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB.
// It MUST be called within a transaction, or else will panic.
func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) (map[string]struct{}, error) {
// Note: in practice, the order of rows returned matches the order of rows of
// array entries. But I don't think that's guaranteed. Return an (unordered) set
// out of paranoia.
queryStr := `
WITH maybe_unknown_events(event_id) AS (SELECT unnest($1::text[]))
SELECT event_id
FROM maybe_unknown_events LEFT JOIN syncv3_events USING(event_id)
WHERE event_nid IS NULL;`

var unknownEventIDs []string
if err := txn.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs)); err != nil {
return nil, err
}
unknownMap := make(map[string]struct{}, len(unknownEventIDs))
for _, eventID := range unknownEventIDs {
unknownMap[eventID] = struct{}{}
}
return unknownMap, nil
}

// UpdateBeforeSnapshotID sets the before_state_snapshot_id field to `snapID` for the given NIDs.
func (t *EventTable) UpdateBeforeSnapshotID(txn *sqlx.Tx, eventNID, snapID, replacesNID int64) error {
_, err := txn.Exec(
Expand Down
76 changes: 76 additions & 0 deletions state/event_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,3 +870,79 @@ func TestRemoveUnsignedTXNID(t *testing.T) {
}
}
}

func TestEventTableSelectUnknownEventIDs(t *testing.T) {
db, close := connectToDB(t)
defer close()
txn, err := db.Beginx()
if err != nil {
t.Fatalf("failed to start txn: %s", err)
}
defer txn.Rollback()
const roomID = "!1:localhost"

// Note: there shouldn't be any other events with these IDs inserted before this
// transaction. $A and $B seem to be inserted and commit in TestEventTablePrevBatch.
const eventID1 = "$A-SelectUnknownEventIDs"
const eventID2 = "$B-SelectUnknownEventIDs"

knownEvents := []Event{
{
Type: "m.room.create",
StateKey: "",
IsState: true,
ID: eventID1,
RoomID: roomID,
},
{
Type: "m.room.name",
StateKey: "",
IsState: true,
ID: eventID2,
RoomID: roomID,
},
}
table := NewEventTable(db)

// Check the event IDs haven't been added by another test.
gotEvents, err := table.SelectByIDs(txn, true, []string{eventID1, eventID2})
if len(gotEvents) > 0 {
t.Fatalf("Event IDs already in use---commited by another test?")
}

// Insert the events
_, err = table.Insert(txn, knownEvents, false)
if err != nil {
t.Fatalf("failed to insert event: %s", err)
}

gotEvents, err = table.SelectByIDs(txn, true, []string{eventID1, eventID2})
if err != nil {
t.Fatalf("failed to select events: %s", err)
}
if gotEvents[0].ID == eventID1 && gotEvents[1].ID == eventID2 {
t.Logf("Got expected event IDs after insert. NIDS: %s=%d, %s=%d", gotEvents[0].ID, gotEvents[0].NID, gotEvents[1].ID, gotEvents[1].NID)
} else {
t.Fatalf("Event ID mismatch: expected $A-SelectUnknownEventIDs and $B-SelectUnknownEventIDs, got %v", gotEvents)
}

// Someone else tells us the state of the room is {A, C}. Query which of those
// event IDs are unknown.
shouldBeUnknownIDs := []string{"$C-SelectUnknownEventIDs", "$D-SelectUnknownEventIDs"}
stateBlockIDs := append(shouldBeUnknownIDs, eventID1)
unknownIDs, err := table.SelectUnknownEventIDs(txn, stateBlockIDs)
t.Logf("unknownIDs=%v", unknownIDs)
if err != nil {
t.Fatalf("failed to select unknown state events: %s", err)
}

// Event C and D should be flagged as unknown.
if len(unknownIDs) != len(shouldBeUnknownIDs) {
t.Fatalf("Expected %d unknown ids, got %v", len(shouldBeUnknownIDs), unknownIDs)
}
for _, unknownEventID := range shouldBeUnknownIDs {
if _, ok := unknownIDs[unknownEventID]; !ok {
t.Errorf("Expected %s to be unknown to the DB, but it wasn't", unknownEventID)
}
}
}
2 changes: 1 addition & 1 deletion state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (s *Storage) Accumulate(roomID, prevBatch string, timeline []json.RawMessag
return s.accumulator.Accumulate(roomID, prevBatch, timeline)
}

func (s *Storage) Initialise(roomID string, state []json.RawMessage) (bool, int64, error) {
func (s *Storage) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) {
return s.accumulator.Initialise(roomID, state)
}

Expand Down
6 changes: 3 additions & 3 deletions state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
},
}
for roomID, eventMap := range roomIDToEventMap {
_, _, err := store.Initialise(roomID, eventMap)
_, err := store.Initialise(roomID, eventMap)
if err != nil {
t.Fatalf("Initialise on %s failed: %s", roomID, err)
}
Expand Down Expand Up @@ -512,7 +512,7 @@ func TestStorageLatestEventsInRoomsPrevBatch(t *testing.T) {
},
}

_, _, err := store.Initialise(roomID, stateEvents)
_, err := store.Initialise(roomID, stateEvents)
if err != nil {
t.Fatalf("failed to initialise: %s", err)
}
Expand Down Expand Up @@ -616,7 +616,7 @@ func TestGlobalSnapshot(t *testing.T) {
store := NewStorage(postgresConnectionString)
defer store.Teardown()
for roomID, stateEvents := range roomIDToEventMap {
_, _, err := store.Initialise(roomID, stateEvents)
_, err := store.Initialise(roomID, stateEvents)
assertNoError(t, err)
}
snapshot, err := store.GlobalSnapshot()
Expand Down
Loading

0 comments on commit 206ee81

Please sign in to comment.