Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inject state blocks from incremental polls into the room timeline #71

Merged
merged 40 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
38c1bda
Integration test
Apr 13, 2023
1d59167
E2E test case draft
Apr 13, 2023
46059df
WIP: update test cases
Apr 14, 2023
419e1ab
Update test case again
Apr 17, 2023
4b90454
Don't reselect NIDs after initialising a room
Apr 17, 2023
a2cba6e
We don't need `eventIDs` either
Apr 17, 2023
c19b561
Merge branch 'dmr/dont-select-after-insert-returning' into dmr/gappy-…
Apr 17, 2023
666823d
Introduce return struct for Initialise
Apr 17, 2023
4ba80d2
Initialise: handle state blocks from a gappy sync
Apr 17, 2023
9fdb001
TODO comment
Apr 17, 2023
33b174d
Fixup test code
Apr 17, 2023
ac9651c
Propagate err message
Apr 17, 2023
5d8560c
Fix db query
Apr 17, 2023
b7a8e7d
Improve logging
Apr 17, 2023
e3008e6
Fix the logging
Apr 17, 2023
2406da5
TODO note
Apr 17, 2023
f28f7d0
Return set of unknown events from the db
Apr 18, 2023
9d53a87
Simply Initialise to bail out early
Apr 18, 2023
2bae784
Propagate prepend events to poller, and inject
Apr 18, 2023
5621423
Fix tests
Apr 18, 2023
9af0471
Always look for unknown state events
Apr 18, 2023
b5893b1
Better error checking in test
Apr 18, 2023
aa07188
Fixup test
Apr 18, 2023
7f03a3d
Use test helper instead
Apr 19, 2023
e6aac43
SelectUnknownEventIDs expects a txn
Apr 19, 2023
b54ba7c
Poller test description
Apr 19, 2023
5dd4315
Test new events table query
Apr 19, 2023
76066f9
Tidy up accumulator event ID handling
Apr 19, 2023
5a9fae1
Fix capitalisation
Apr 19, 2023
b32e5da
Fix test function args too
Apr 19, 2023
00dd396
Fix MatchRoomTimelineMostRecent
Apr 19, 2023
bea931d
Fix running unit test alongside other tests
Apr 19, 2023
1680952
Tidyup unit test
Apr 19, 2023
6e7d0cb
Test memberships are updated after gappy sync
Apr 19, 2023
aa09d12
Check the new query reports multiple unknown IDs
Apr 21, 2023
8324f6e
SelectByIDs has an ordering guarantee
Apr 21, 2023
6c9aa09
Pass room ID to new query
Apr 24, 2023
2755384
Revert "Pass room ID to new query"
Apr 24, 2023
82d1d58
Update integration test
Apr 24, 2023
3274cf2
typo
Apr 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 55 additions & 29 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,20 +131,37 @@ func (a *Accumulator) roomInfoDelta(roomID string, events []Event) RoomInfo {
}
}

type InitialiseResult struct {
// AddedEvents is true iff this call to Initialise added new state events to the DB.
AddedEvents bool
// SnapshotID is the ID of the snapshot which incorporates all added events.
// It has no meaning if AddedEvents is False.
SnapshotID int64
// PrependTimelineEvents is empty if the room was not initialised prior to this call.
// Otherwise, it is an order-preserving subset of the `state` argument to Initialise
// containing all events that were not persisted prior to the Initialise call. These
// should be prepended to the room timeline by the caller.
PrependTimelineEvents []json.RawMessage
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
}

// Initialise starts a new sync accumulator for the given room using the given state as a baseline.
// This will only take effect if this is the first time the v3 server has seen this room, and it wasn't
// possible to get all events up to the create event (e.g Matrix HQ). Returns true if this call actually
// added new events, along with the snapshot NID.
//
// This will only take effect if this is the first time the v3 server has seen this room, and it wasn't
// possible to get all events up to the create event (e.g Matrix HQ).
// This function:
// - Stores these events
// - Sets up the current snapshot based on the state list given.
func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, int64, error) {
//
// If the v3 server has seen this room before, this function
// - queries the DB to determine which state events are known to th server,
// - returns (via InitialiseResult.PrependTimelineEvents) a slice of unknown state events,
//
// and otherwise does nothing.
func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) {
var res InitialiseResult
if len(state) == 0 {
return false, 0, nil
return res, nil
}
addedEvents := false
var snapID int64
err := sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error {
// Attempt to short-circuit. This has to be done inside a transaction to make sure
// we don't race with multiple calls to Initialise with the same room ID.
Expand All @@ -153,8 +170,26 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
return fmt.Errorf("error fetching snapshot id for room %s: %s", roomID, err)
}
if snapshotID > 0 {
// we only initialise rooms once
logger.Info().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called but current snapshot already exists, bailing early")
// Poller A has received a gappy sync v2 response with a state block, and
// we have seen this room before. If we knew for certain that there is some
// other active poller B in this room then we could safely skip this logic.

// Log at debug for now. If we find an unknown event, we'll return it so
// that the poller can log a warning.
logger.Debug().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with incremental state but current snapshot already exists.")
eventIDs := make([]string, len(state))
for i := range state {
eventIDs[i] = gjson.ParseBytes(state[i]).Get("event_id").Str
}
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be improved if we made use of the room ID as it can cut lookup times down a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I'd just assumed this was indexed on event id---maybe we should add that index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err, the event_id column is UNIQUE, so it will be index will exist. Not sure why the room ID would help here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The room ID can help as it cuts down the dataset immediately, rather than relying on a the mahoosive event ID index of all events ever. In the past it has been beneficial to include this information when running EXPLAIN.

Copy link
Contributor Author

@DMRobertson DMRobertson Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remain sceptical.

-- Matrix HQ
\set room_id '!OGEhHVWSdvArJzumhm:matrix.org'
-- 20 event Ids taken from Matrix HQ, plus two fake event IDs
\set event_ids {$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}

Query without room ID:

EXPLAIN ANALYZE VERBOSE
WITH maybe_unknown_events(event_id) AS (SELECT unnest(:'event_ids'::text[]))
	SELECT *
	FROM maybe_unknown_events LEFT JOIN syncv3_events USING(event_id)
	WHERE event_nid IS NULL;
                                         QUERY PLAN                                                                                                                                                                                                                                         
                                                                                                                                                                                                                                                                                            
                                                                                            
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------
 Nested Loop Left Join  (cost=0.56..189.27 rows=1 width=648) (actual time=0.030..0.239 rows=2 loops=1)
   Output: (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq
3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv
0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6
ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])), syncv3_events.event_nid, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, sync
v3_events.prev_batch, syncv3_events.membership, syncv3_events.is_state, syncv3_events.event
   Inner Unique: true
   Filter: (syncv3_events.event_nid IS NULL)
   Rows Removed by Filter: 20
   ->  ProjectSet  (cost=0.00..0.13 rows=22 width=32) (actual time=0.003..0.007 rows=22 loops=1)
         Output: unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8
JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nk
rcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3
qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])
         ->  Result  (cost=0.00..0.01 rows=1 width=0) (actual time=0.001..0.001 rows=1 loops=1)
   ->  Index Scan using syncv3_events_event_id_key on public.syncv3_events  (cost=0.56..8.58 rows=1 width=658) (actual time=0.010..0.010 rows=1 loops=22)
         Output: syncv3_events.event_nid, syncv3_events.event_id, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.prev_batch, syncv3_events.membership, syncv3_events.is_s
tate, syncv3_events.event
         Index Cond: (syncv3_events.event_id = (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni
609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5
XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1Fiqw
YOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])))
 Query Identifier: -467502754092800539
 Planning Time: 0.142 ms
 Execution Time: 0.264 ms

Query also checking room ID (note it is not just a case of appending WHERE room_id = ...):

EXPLAIN ANALYZE VERBOSE
WITH maybe_unknown_events(event_id) AS (SELECT unnest(:'event_ids'::text[]))
	SELECT *
	FROM maybe_unknown_events LEFT JOIN syncv3_events ON maybe_unknown_events.event_id = syncv3_events.event_id AND room_id = :'room_id'
	WHERE event_nid IS NULL;
	                                                     QUERY PLAN                                                                                                                                                                                                                             
                                                                                                                                                                                                                                                                                            
                                                                                                                    
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
 Nested Loop Left Join  (cost=0.56..189.33 rows=1 width=690) (actual time=0.053..0.262 rows=2 loops=1)
   Output: (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq
3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv
0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6
ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])), syncv3_events.event_nid, syncv3_events.event_id, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv
3_events.state_key, syncv3_events.prev_batch, syncv3_events.membership, syncv3_events.is_state, syncv3_events.event
   Inner Unique: true
   Filter: (syncv3_events.event_nid IS NULL)
   Rows Removed by Filter: 20
   ->  ProjectSet  (cost=0.00..0.13 rows=22 width=32) (actual time=0.003..0.006 rows=22 loops=1)
         Output: unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8
JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nk
rcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3
qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])
         ->  Result  (cost=0.00..0.01 rows=1 width=0) (actual time=0.001..0.001 rows=1 loops=1)
   ->  Index Scan using syncv3_events_event_id_key on public.syncv3_events  (cost=0.56..8.58 rows=1 width=658) (actual time=0.011..0.011 rows=1 loops=22)
         Output: syncv3_events.event_nid, syncv3_events.event_id, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.prev_batch, syncv3_events.membership, syncv3_events.is_s
tate, syncv3_events.event
         Index Cond: (syncv3_events.event_id = (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni
609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5
XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1Fiqw
YOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])))
         Filter: (syncv3_events.room_id = '!OGEhHVWSdvArJzumhm:matrix.org'::text)
 Query Identifier: 2484314538527637739
 Planning Time: 0.163 ms
 Execution Time: 0.285 ms

AFAICS the only difference is an additional

     Filter: (syncv3_events.room_id = '!OGEhHVWSdvArJzumhm:matrix.org'::text)

at the bottom of the second query plan, which removes no rows anyway. The time diff (~+40 microseconds) is positive. (Though this isn't a statistical analysis; that diff may be just noise.)

if err != nil {
return fmt.Errorf("error determing which event IDs are unknown: %s", err)
}
for i := range state {
if _, unknown := unknownEventIDs[eventIDs[i]]; unknown {
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
res.PrependTimelineEvents = append(res.PrependTimelineEvents, state[i])
}
}
return nil
}

Expand Down Expand Up @@ -184,28 +219,19 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
}

// pull out the event NIDs we just inserted
eventIDs := make([]string, len(events))
membershipEventIDs := make(map[string]struct{}, len(events))
for i := range eventIDs {
eventIDs[i] = events[i].ID
if events[i].Type == "m.room.member" {
membershipEventIDs[events[i].ID] = struct{}{}
for _, event := range events {
if event.Type == "m.room.member" {
membershipEventIDs[event.ID] = struct{}{}
}
}
idToNIDs, err := a.eventsTable.SelectNIDsByIDs(txn, eventIDs)
if err != nil {
return fmt.Errorf("failed to select NIDs for inserted events: %w", err)
}
if len(idToNIDs) != len(eventIDs) {
return fmt.Errorf("missing events just inserted, asked for %v got %v", eventIDs, idToNIDs)
}
memberNIDs := make([]int64, 0, len(idToNIDs))
otherNIDs := make([]int64, 0, len(idToNIDs))
for evID, nid := range idToNIDs {
memberNIDs := make([]int64, 0, len(eventIDToNID))
otherNIDs := make([]int64, 0, len(eventIDToNID))
for evID, nid := range eventIDToNID {
if _, exists := membershipEventIDs[evID]; exists {
memberNIDs = append(memberNIDs, nid)
memberNIDs = append(memberNIDs, int64(nid))
} else {
otherNIDs = append(otherNIDs, nid)
otherNIDs = append(otherNIDs, int64(nid))
}
}

Expand All @@ -219,7 +245,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
if err != nil {
return fmt.Errorf("failed to insert snapshot: %w", err)
}
addedEvents = true
res.AddedEvents = true
latestNID := int64(0)
for _, nid := range otherNIDs {
if nid > latestNID {
Expand All @@ -244,10 +270,10 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
// will have an associated state snapshot ID on the event.

// Set the snapshot ID as the current state
snapID = snapshot.SnapshotID
res.SnapshotID = snapshot.SnapshotID
return a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID)
})
return addedEvents, snapID, err
return res, err
}

// Accumulate internal state from a user's sync response. The timeline order MUST be in the order
Expand Down
22 changes: 11 additions & 11 deletions state/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ func TestAccumulatorInitialise(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
added, initSnapID, err := accumulator.Initialise(roomID, roomEvents)
res, err := accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("falied to Initialise accumulator: %s", err)
}
if !added {
if !res.AddedEvents {
t.Fatalf("didn't add events, wanted it to")
}

Expand All @@ -45,8 +45,8 @@ func TestAccumulatorInitialise(t *testing.T) {
if snapID == 0 {
t.Fatalf("Initialise did not store a current snapshot")
}
if snapID != initSnapID {
t.Fatalf("Initialise returned wrong snapshot ID, got %v want %v", initSnapID, snapID)
if snapID != res.SnapshotID {
t.Fatalf("Initialise returned wrong snapshot ID, got %v want %v", res.SnapshotID, snapID)
}

// this snapshot should have 1 member event and 2 other events in it
Expand Down Expand Up @@ -80,11 +80,11 @@ func TestAccumulatorInitialise(t *testing.T) {
}

// Subsequent calls do nothing and are not an error
added, _, err = accumulator.Initialise(roomID, roomEvents)
res, err = accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("falied to Initialise accumulator: %s", err)
}
if added {
if res.AddedEvents {
t.Fatalf("added events when it shouldn't have")
}
}
Expand All @@ -99,7 +99,7 @@ func TestAccumulatorAccumulate(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, roomEvents)
_, err := accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestAccumulatorDelta(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, nil)
_, err := accumulator.Initialise(roomID, nil)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestAccumulatorMembershipLogs(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, nil)
_, err := accumulator.Initialise(roomID, nil)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestAccumulatorDupeEvents(t *testing.T) {
defer close()
accumulator := NewAccumulator(db)
roomID := "!buggy:localhost"
_, _, err := accumulator.Initialise(roomID, joinRoom.State.Events)
_, err := accumulator.Initialise(roomID, joinRoom.State.Events)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestAccumulatorMisorderedGraceful(t *testing.T) {
accumulator := NewAccumulator(db)
roomID := "!TestAccumulatorStateReset:localhost"
// Create a room with initial state A,C
_, _, err := accumulator.Initialise(roomID, []json.RawMessage{
_, err := accumulator.Initialise(roomID, []json.RawMessage{
eventA, eventC,
})
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions state/event_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,34 @@ func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids

}

// SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB.
func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) (map[string]struct{}, error) {
// Note: in practice, the order of rows returned matches the order of rows of
// array entries. But I don't think that's guaranteed. Return an (unordered) set
// out of paranoia.
queryStr := `
WITH maybe_unknown_events(event_id) AS (SELECT unnest($1::text[]))
SELECT event_id
FROM maybe_unknown_events LEFT JOIN syncv3_events USING(event_id)
WHERE event_nid IS NULL;`
Comment on lines +275 to +278
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did test this in a postgres playground, but I think this might be worth a unittest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do. There is event_table_test.go to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use this style of query anywhere in the proxy atm. I'm guessing you do this so you can "select then negate" all on the DB side? I'd expected to see something more like:

SELECT event_id FROM syncv3_events WHERE room_id=$1 AND event_id=ANY($2)

but then you'd need to negate on the Go side to find the missing ones. I wonder which is better.

Copy link
Contributor Author

@DMRobertson DMRobertson Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the thinking was basically to get the DB to tell me which events are unknown, rather than getting it to tell me which events are known and then computing the complement. (I'm assuming that we know about most state events, so we may as well get the DB to transmit the smaller set back to us.)

I don't think it makes a huge difference if the state block and room are small. I'm slightly worried about hitting this query for Matrix HQ with state block from a new poller's initial sync.


var unknownEventIDs []string
var err error
if txn != nil {
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
err = txn.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs))
} else {
err = t.db.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs))
}
if err != nil {
return nil, err
}
unknownMap := make(map[string]struct{}, len(unknownEventIDs))
for _, eventID := range unknownEventIDs {
unknownMap[eventID] = struct{}{}
}
return unknownMap, nil
}

// UpdateBeforeSnapshotID sets the before_state_snapshot_id field to `snapID` for the given NIDs.
func (t *EventTable) UpdateBeforeSnapshotID(txn *sqlx.Tx, eventNID, snapID, replacesNID int64) error {
_, err := txn.Exec(
Expand Down
2 changes: 1 addition & 1 deletion state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (s *Storage) Accumulate(roomID, prevBatch string, timeline []json.RawMessag
return s.accumulator.Accumulate(roomID, prevBatch, timeline)
}

func (s *Storage) Initialise(roomID string, state []json.RawMessage) (bool, int64, error) {
func (s *Storage) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) {
return s.accumulator.Initialise(roomID, state)
}

Expand Down
6 changes: 3 additions & 3 deletions state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
},
}
for roomID, eventMap := range roomIDToEventMap {
_, _, err := store.Initialise(roomID, eventMap)
_, err := store.Initialise(roomID, eventMap)
if err != nil {
t.Fatalf("Initialise on %s failed: %s", roomID, err)
}
Expand Down Expand Up @@ -512,7 +512,7 @@ func TestStorageLatestEventsInRoomsPrevBatch(t *testing.T) {
},
}

_, _, err := store.Initialise(roomID, stateEvents)
_, err := store.Initialise(roomID, stateEvents)
if err != nil {
t.Fatalf("failed to initialise: %s", err)
}
Expand Down Expand Up @@ -616,7 +616,7 @@ func TestGlobalSnapshot(t *testing.T) {
store := NewStorage(postgresConnectionString)
defer store.Teardown()
for roomID, stateEvents := range roomIDToEventMap {
_, _, err := store.Initialise(roomID, stateEvents)
_, err := store.Initialise(roomID, stateEvents)
assertNoError(t, err)
}
snapshot, err := store.GlobalSnapshot()
Expand Down
19 changes: 9 additions & 10 deletions sync2/handler2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,21 +240,20 @@ func (h *Handler) Accumulate(deviceID, roomID, prevBatch string, timeline []json
})
}

func (h *Handler) Initialise(roomID string, state []json.RawMessage) {
added, snapID, err := h.Store.Initialise(roomID, state)
func (h *Handler) Initialise(roomID string, state []json.RawMessage) []json.RawMessage {
res, err := h.Store.Initialise(roomID, state)
if err != nil {
logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room")
sentry.CaptureException(err)
return
return nil
}
if !added {
// no new events
return
if res.AddedEvents {
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Initialise{
RoomID: roomID,
SnapshotNID: res.SnapshotID,
})
}
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Initialise{
RoomID: roomID,
SnapshotNID: snapID,
})
return res.PrependTimelineEvents
}

func (h *Handler) SetTyping(roomID string, ephEvent json.RawMessage) {
Expand Down
28 changes: 24 additions & 4 deletions sync2/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sync2
import (
"context"
"encoding/json"
"github.com/getsentry/sentry-go"
"sync"
"sync/atomic"
"time"
Expand All @@ -23,7 +24,8 @@ type V2DataReceiver interface {
// Accumulate data for this room. This means the timeline section of the v2 response.
Accumulate(deviceID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries
// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
Initialise(roomID string, state []json.RawMessage) // snapshot ID?
// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
Initialise(roomID string, state []json.RawMessage) []json.RawMessage // snapshot ID?
// SetTyping indicates which users are typing.
SetTyping(roomID string, ephEvent json.RawMessage)
// Sent when there is a new receipt
Expand Down Expand Up @@ -196,14 +198,15 @@ func (h *PollerMap) Accumulate(deviceID, roomID, prevBatch string, timeline []js
}
wg.Wait()
}
func (h *PollerMap) Initialise(roomID string, state []json.RawMessage) {
func (h *PollerMap) Initialise(roomID string, state []json.RawMessage) (result []json.RawMessage) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
h.callbacks.Initialise(roomID, state)
result = h.callbacks.Initialise(roomID, state)
wg.Done()
}
wg.Wait()
return
}
func (h *PollerMap) SetTyping(roomID string, ephEvent json.RawMessage) {
var wg sync.WaitGroup
Expand Down Expand Up @@ -496,7 +499,24 @@ func (p *poller) parseRoomsResponse(res *SyncResponse) {
for roomID, roomData := range res.Rooms.Join {
if len(roomData.State.Events) > 0 {
stateCalls++
p.receiver.Initialise(roomID, roomData.State.Events)
prependStateEvents := p.receiver.Initialise(roomID, roomData.State.Events)
if len(prependStateEvents) > 0 {
// The poller has just learned of these state events due to an
// incremental poller sync; we must have missed the opportunity to see
// these down /sync in a timeline. As a workaround, inject these into
// the timeline now so that future events are received under the
// correct room state.
const warnMsg = "parseRoomsResponse: prepending state events to timeline after gappy poll"
log.Warn().Str("room_id", roomID).Int("prependStateEvents", len(prependStateEvents)).Msg(warnMsg)
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetContext("sliding-sync", map[string]interface{}{
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
"room_id": roomID,
"num_prepend_state_events": len(prependStateEvents),
})
sentry.CaptureMessage(warnMsg)
})
roomData.Timeline.Events = append(prependStateEvents, roomData.Timeline.Events...)
}
}
// process typing/receipts before events so we seed the caches correctly for when we return the room
for _, ephEvent := range roomData.Ephemeral.Events {
Expand Down
5 changes: 4 additions & 1 deletion sync2/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,14 +460,17 @@ type mockDataReceiver struct {
func (a *mockDataReceiver) Accumulate(userID, roomID, prevBatch string, timeline []json.RawMessage) {
a.timelines[roomID] = append(a.timelines[roomID], timeline...)
}
func (a *mockDataReceiver) Initialise(roomID string, state []json.RawMessage) {
func (a *mockDataReceiver) Initialise(roomID string, state []json.RawMessage) []json.RawMessage {
a.states[roomID] = state
if a.incomingProcess != nil {
a.incomingProcess <- struct{}{}
}
if a.unblockProcess != nil {
<-a.unblockProcess
}
// The return value is a list of unknown state events to be prepended to the room
// timeline. Untested here---return nil for now.
return nil
}
func (a *mockDataReceiver) SetTyping(roomID string, ephEvent json.RawMessage) {
}
Expand Down
Loading