Skip to content

Commit

Permalink
perf: improve startup speeds by using temp tables
Browse files Browse the repository at this point in the history
When the proxy is run with large DBs (10m+ events), the
startup queries are very slow (around 30min to load the initial snapshot.

After much EXPLAIN ANALYZEing, the cause is due to Postgres' query planner
not making good decisions when the the tables are that large. Specifically,
the startup queries need to pull all joined members in all rooms, which
ends up being nearly 50% of the entire events table of 10m rows. When this
query is embedded in a subselect, the query planner assumes that the subselect
will return only a few rows, and decides to pull those rows via an index. In this
particular case, indexes are the wrong choice, as there are SO MANY rows a Seq Scan
is often more appropriate. By using an index (which is a btree), this ends up doing
log(n) operations _per row_ or `O(0.5 * n * log(n))` assuming we pull 50% of the
table of n rows. As n increases, this is increasingly the wrong call over a basic
O(n) seq scan. When n=10m, a seq scan has a cost of 10m, but using indexes has a
cost of 16.6m. By dumping the result of the subselect to a temporary table, this
allows the query planner to notice that using an index is the wrong thing to do,
resulting in better performance. On large DBs, this decreases the startup time
from 30m to ~5m.
  • Loading branch information
kegsay committed May 18, 2023
1 parent a7f823d commit fa67467
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 22 deletions.
53 changes: 32 additions & 21 deletions state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,37 @@ func (s *Storage) InsertAccountData(userID, roomID string, events []json.RawMess
return data, err
}

// Prepare a snapshot of the database for calling snapshot functions.
func (s *Storage) PrepareSnapshot(txn *sqlx.Tx) (tableName string, err error) {
// create a temporary table with all the membership nids for the current snapshots for all rooms.
// A temporary table will be deleted when the postgres session ends (this process quits).
// We insert these into a temporary table to let the query planner make better decisions. In practice,
// if we instead nest this SELECT as a subselect, we see very poor query times for large tables as
// each event NID is queried using a btree index, rather than doing a seq scan as this query will pull
// out ~50% of the rows in syncv3_events.
tempTableName := "temp_snapshot"
_, err = txn.Exec(
`SELECT UNNEST(membership_events) AS membership_nid INTO TEMP ` + tempTableName + ` FROM syncv3_snapshots
JOIN syncv3_rooms ON syncv3_snapshots.snapshot_id = syncv3_rooms.current_snapshot_id`,
)
return tempTableName, err
}

// GlobalSnapshot snapshots the entire database for the purposes of initialising
// a sliding sync instance. It will atomically grab metadata for all rooms and all joined members
// in a single transaction.
func (s *Storage) GlobalSnapshot() (ss StartupSnapshot, err error) {
err = sqlutil.WithTransaction(s.accumulator.db, func(txn *sqlx.Tx) error {
tempTableName, err := s.PrepareSnapshot(txn)
if err != nil {
return err
}
var metadata map[string]internal.RoomMetadata
ss.AllJoinedMembers, metadata, err = s.AllJoinedMembers(txn)
ss.AllJoinedMembers, metadata, err = s.AllJoinedMembers(txn, tempTableName)
if err != nil {
return err
}
err = s.MetadataForAllRooms(txn, metadata)
err = s.MetadataForAllRooms(txn, tempTableName, metadata)
if err != nil {
return err
}
Expand All @@ -140,16 +160,12 @@ func (s *Storage) GlobalSnapshot() (ss StartupSnapshot, err error) {
return
}

// Extract hero info for all rooms.
func (s *Storage) MetadataForAllRooms(txn *sqlx.Tx, result map[string]internal.RoomMetadata) error {
// Extract hero info for all rooms. Requires a prepared snapshot in order to be called.
func (s *Storage) MetadataForAllRooms(txn *sqlx.Tx, tempTableName string, result map[string]internal.RoomMetadata) error {
// Select the invited member counts
rows, err := txn.Query(`
SELECT room_id, count(state_key) FROM syncv3_events
WHERE (membership='_invite' OR membership = 'invite') AND event_type='m.room.member' AND event_nid IN (
SELECT unnest(membership_events) FROM syncv3_snapshots WHERE syncv3_snapshots.snapshot_id IN (
SELECT current_snapshot_id FROM syncv3_rooms
)
) GROUP BY room_id`)
SELECT room_id, count(state_key) FROM syncv3_events INNER JOIN ` + tempTableName + ` ON membership_nid=event_nid
WHERE (membership='_invite' OR membership = 'invite') AND event_type='m.room.member' GROUP BY room_id`)
if err != nil {
return err
}
Expand Down Expand Up @@ -206,14 +222,10 @@ func (s *Storage) MetadataForAllRooms(txn *sqlx.Tx, result map[string]internal.R
SELECT rf.* FROM (
SELECT room_id, event, rank() OVER (
PARTITION BY room_id ORDER BY event_nid DESC
) FROM syncv3_events WHERE (
) FROM syncv3_events INNER JOIN ` + tempTableName + ` ON membership_nid=event_nid WHERE (
membership='join' OR membership='invite' OR membership='_join'
) AND event_type='m.room.member' AND event_nid IN (
SELECT unnest(membership_events) FROM syncv3_snapshots WHERE syncv3_snapshots.snapshot_id IN (
SELECT current_snapshot_id FROM syncv3_rooms
)
)
) rf WHERE rank <= 6`)
) AND event_type='m.room.member'
) rf WHERE rank <= 6;`)
if err != nil {
return fmt.Errorf("failed to query heroes: %s", err)
}
Expand Down Expand Up @@ -744,11 +756,10 @@ func (s *Storage) RoomMembershipDelta(roomID string, from, to int64, limit int)
return
}

func (s *Storage) AllJoinedMembers(txn *sqlx.Tx) (result map[string][]string, metadata map[string]internal.RoomMetadata, err error) {
// Extract all rooms with joined members, and include the joined user list. Requires a prepared snapshot in order to be called.
func (s *Storage) AllJoinedMembers(txn *sqlx.Tx, tempTableName string) (result map[string][]string, metadata map[string]internal.RoomMetadata, err error) {
rows, err := txn.Query(
`SELECT room_id, state_key from syncv3_events WHERE (membership='join' OR membership='_join') AND event_nid IN (
SELECT UNNEST(membership_events) FROM syncv3_snapshots JOIN syncv3_rooms ON syncv3_snapshots.snapshot_id = syncv3_rooms.current_snapshot_id
) ORDER BY event_nid ASC`,
`SELECT room_id, state_key from ` + tempTableName + ` INNER JOIN syncv3_events on membership_nid = event_nid WHERE membership='join' OR membership='_join' ORDER BY event_nid ASC`,
)
if err != nil {
return nil, nil, err
Expand Down
6 changes: 5 additions & 1 deletion state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ func TestStorageJoinedRoomsAfterPosition(t *testing.T) {
JoinCount: 2,
},
}
err = store.MetadataForAllRooms(txn, roomIDToMetadata)
tempTableName, err := store.PrepareSnapshot(txn)
if err != nil {
t.Fatalf("PrepareSnapshot: %s", err)
}
err = store.MetadataForAllRooms(txn, tempTableName, roomIDToMetadata)
txn.Commit()
if err != nil {
t.Fatalf("MetadataForAllRooms: %s", err)
Expand Down

0 comments on commit fa67467

Please sign in to comment.