Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inject state blocks from incremental polls into the room timeline #71

Merged
merged 40 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
38c1bda
Integration test
Apr 13, 2023
1d59167
E2E test case draft
Apr 13, 2023
46059df
WIP: update test cases
Apr 14, 2023
419e1ab
Update test case again
Apr 17, 2023
4b90454
Don't reselect NIDs after initialising a room
Apr 17, 2023
a2cba6e
We don't need `eventIDs` either
Apr 17, 2023
c19b561
Merge branch 'dmr/dont-select-after-insert-returning' into dmr/gappy-…
Apr 17, 2023
666823d
Introduce return struct for Initialise
Apr 17, 2023
4ba80d2
Initialise: handle state blocks from a gappy sync
Apr 17, 2023
9fdb001
TODO comment
Apr 17, 2023
33b174d
Fixup test code
Apr 17, 2023
ac9651c
Propagate err message
Apr 17, 2023
5d8560c
Fix db query
Apr 17, 2023
b7a8e7d
Improve logging
Apr 17, 2023
e3008e6
Fix the logging
Apr 17, 2023
2406da5
TODO note
Apr 17, 2023
f28f7d0
Return set of unknown events from the db
Apr 18, 2023
9d53a87
Simply Initialise to bail out early
Apr 18, 2023
2bae784
Propagate prepend events to poller, and inject
Apr 18, 2023
5621423
Fix tests
Apr 18, 2023
9af0471
Always look for unknown state events
Apr 18, 2023
b5893b1
Better error checking in test
Apr 18, 2023
aa07188
Fixup test
Apr 18, 2023
7f03a3d
Use test helper instead
Apr 19, 2023
e6aac43
SelectUnknownEventIDs expects a txn
Apr 19, 2023
b54ba7c
Poller test description
Apr 19, 2023
5dd4315
Test new events table query
Apr 19, 2023
76066f9
Tidy up accumulator event ID handling
Apr 19, 2023
5a9fae1
Fix capitalisation
Apr 19, 2023
b32e5da
Fix test function args too
Apr 19, 2023
00dd396
Fix MatchRoomTimelineMostRecent
Apr 19, 2023
bea931d
Fix running unit test alongside other tests
Apr 19, 2023
1680952
Tidyup unit test
Apr 19, 2023
6e7d0cb
Test memberships are updated after gappy sync
Apr 19, 2023
aa09d12
Check the new query reports multiple unknown IDs
Apr 21, 2023
8324f6e
SelectByIDs has an ordering guarantee
Apr 21, 2023
6c9aa09
Pass room ID to new query
Apr 24, 2023
2755384
Revert "Pass room ID to new query"
Apr 24, 2023
82d1d58
Update integration test
Apr 24, 2023
3274cf2
typo
Apr 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 59 additions & 29 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,20 +131,37 @@ func (a *Accumulator) roomInfoDelta(roomID string, events []Event) RoomInfo {
}
}

type InitialiseResult struct {
// AddedEvents is true iff this call to Initialise added new state events to the DB.
AddedEvents bool
// SnapshotID is the ID of the snapshot which incorporates all added events.
// It has no meaning if AddedEvents is False.
SnapshotID int64
// PrependTimelineEvents is empty if the room was not initialised prior to this call.
// Otherwise, it is an order-preserving subset of the `state` argument to Initialise
// containing all events that were not persisted prior to the Initialise call. These
// should be prepended to the room timeline by the caller.
PrependTimelineEvents []json.RawMessage
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
}

// Initialise starts a new sync accumulator for the given room using the given state as a baseline.
// This will only take effect if this is the first time the v3 server has seen this room, and it wasn't
// possible to get all events up to the create event (e.g Matrix HQ). Returns true if this call actually
// added new events, along with the snapshot NID.
//
// This will only take effect if this is the first time the v3 server has seen this room, and it wasn't
// possible to get all events up to the create event (e.g Matrix HQ).
// This function:
// - Stores these events
// - Sets up the current snapshot based on the state list given.
func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, int64, error) {
//
// If the v3 server has seen this room before, this function
// - queries the DB to determine which state events are known to th server,
// - returns (via InitialiseResult.PrependTimelineEvents) a slice of unknown state events,
//
// and otherwise does nothing.
func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) {
var res InitialiseResult
if len(state) == 0 {
return false, 0, nil
return res, nil
}
addedEvents := false
var snapID int64
err := sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error {
// Attempt to short-circuit. This has to be done inside a transaction to make sure
// we don't race with multiple calls to Initialise with the same room ID.
Expand All @@ -153,8 +170,30 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
return fmt.Errorf("error fetching snapshot id for room %s: %s", roomID, err)
}
if snapshotID > 0 {
// we only initialise rooms once
logger.Info().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called but current snapshot already exists, bailing early")
// Poller A has received a gappy sync v2 response with a state block, and
// we have seen this room before. If we knew for certain that there is some
// other active poller B in this room then we could safely skip this logic.

// Log at debug for now. If we find an unknown event, we'll return it so
// that the poller can log a warning.
logger.Debug().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with incremental state but current snapshot already exists.")
eventIDs := make([]string, len(state))
eventIDToRawEvent := make(map[string]json.RawMessage, len(state))
for i := range state {
eventID := gjson.ParseBytes(state[i]).Get("event_id")
if !eventID.Exists() || eventID.Type != gjson.String {
return fmt.Errorf("Event %d lacks an event ID", i)
}
eventIDToRawEvent[eventID.Str] = state[i]
eventIDs[i] = eventID.Str
}
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be improved if we made use of the room ID as it can cut lookup times down a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I'd just assumed this was indexed on event id---maybe we should add that index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err, the event_id column is UNIQUE, so it will be index will exist. Not sure why the room ID would help here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The room ID can help as it cuts down the dataset immediately, rather than relying on a the mahoosive event ID index of all events ever. In the past it has been beneficial to include this information when running EXPLAIN.

Copy link
Contributor Author

@DMRobertson DMRobertson Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remain sceptical.

-- Matrix HQ
\set room_id '!OGEhHVWSdvArJzumhm:matrix.org'
-- 20 event Ids taken from Matrix HQ, plus two fake event IDs
\set event_ids {$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}

Query without room ID:

EXPLAIN ANALYZE VERBOSE
WITH maybe_unknown_events(event_id) AS (SELECT unnest(:'event_ids'::text[]))
	SELECT *
	FROM maybe_unknown_events LEFT JOIN syncv3_events USING(event_id)
	WHERE event_nid IS NULL;
                                         QUERY PLAN                                                                                                                                                                                                                                         
                                                                                                                                                                                                                                                                                            
                                                                                            
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------
 Nested Loop Left Join  (cost=0.56..189.27 rows=1 width=648) (actual time=0.030..0.239 rows=2 loops=1)
   Output: (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq
3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv
0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6
ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])), syncv3_events.event_nid, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, sync
v3_events.prev_batch, syncv3_events.membership, syncv3_events.is_state, syncv3_events.event
   Inner Unique: true
   Filter: (syncv3_events.event_nid IS NULL)
   Rows Removed by Filter: 20
   ->  ProjectSet  (cost=0.00..0.13 rows=22 width=32) (actual time=0.003..0.007 rows=22 loops=1)
         Output: unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8
JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nk
rcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3
qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])
         ->  Result  (cost=0.00..0.01 rows=1 width=0) (actual time=0.001..0.001 rows=1 loops=1)
   ->  Index Scan using syncv3_events_event_id_key on public.syncv3_events  (cost=0.56..8.58 rows=1 width=658) (actual time=0.010..0.010 rows=1 loops=22)
         Output: syncv3_events.event_nid, syncv3_events.event_id, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.prev_batch, syncv3_events.membership, syncv3_events.is_s
tate, syncv3_events.event
         Index Cond: (syncv3_events.event_id = (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni
609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5
XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1Fiqw
YOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])))
 Query Identifier: -467502754092800539
 Planning Time: 0.142 ms
 Execution Time: 0.264 ms

Query also checking room ID (note it is not just a case of appending WHERE room_id = ...):

EXPLAIN ANALYZE VERBOSE
WITH maybe_unknown_events(event_id) AS (SELECT unnest(:'event_ids'::text[]))
	SELECT *
	FROM maybe_unknown_events LEFT JOIN syncv3_events ON maybe_unknown_events.event_id = syncv3_events.event_id AND room_id = :'room_id'
	WHERE event_nid IS NULL;
	                                                     QUERY PLAN                                                                                                                                                                                                                             
                                                                                                                                                                                                                                                                                            
                                                                                                                    
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
 Nested Loop Left Join  (cost=0.56..189.33 rows=1 width=690) (actual time=0.053..0.262 rows=2 loops=1)
   Output: (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq
3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv
0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6
ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])), syncv3_events.event_nid, syncv3_events.event_id, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv
3_events.state_key, syncv3_events.prev_batch, syncv3_events.membership, syncv3_events.is_state, syncv3_events.event
   Inner Unique: true
   Filter: (syncv3_events.event_nid IS NULL)
   Rows Removed by Filter: 20
   ->  ProjectSet  (cost=0.00..0.13 rows=22 width=32) (actual time=0.003..0.006 rows=22 loops=1)
         Output: unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8
JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nk
rcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3
qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])
         ->  Result  (cost=0.00..0.01 rows=1 width=0) (actual time=0.001..0.001 rows=1 loops=1)
   ->  Index Scan using syncv3_events_event_id_key on public.syncv3_events  (cost=0.56..8.58 rows=1 width=658) (actual time=0.011..0.011 rows=1 loops=22)
         Output: syncv3_events.event_nid, syncv3_events.event_id, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.prev_batch, syncv3_events.membership, syncv3_events.is_s
tate, syncv3_events.event
         Index Cond: (syncv3_events.event_id = (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni
609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5
XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1Fiqw
YOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])))
         Filter: (syncv3_events.room_id = '!OGEhHVWSdvArJzumhm:matrix.org'::text)
 Query Identifier: 2484314538527637739
 Planning Time: 0.163 ms
 Execution Time: 0.285 ms

AFAICS the only difference is an additional

     Filter: (syncv3_events.room_id = '!OGEhHVWSdvArJzumhm:matrix.org'::text)

at the bottom of the second query plan, which removes no rows anyway. The time diff (~+40 microseconds) is positive. (Though this isn't a statistical analysis; that diff may be just noise.)

if err != nil {
return fmt.Errorf("error determing which event IDs are unknown: %s", err)
}
for unknownEventID := range unknownEventIDs {
res.PrependTimelineEvents = append(res.PrependTimelineEvents, eventIDToRawEvent[unknownEventID])
}
return nil
}

Expand Down Expand Up @@ -184,28 +223,19 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
}

// pull out the event NIDs we just inserted
eventIDs := make([]string, len(events))
membershipEventIDs := make(map[string]struct{}, len(events))
for i := range eventIDs {
eventIDs[i] = events[i].ID
if events[i].Type == "m.room.member" {
membershipEventIDs[events[i].ID] = struct{}{}
for _, event := range events {
if event.Type == "m.room.member" {
membershipEventIDs[event.ID] = struct{}{}
}
}
idToNIDs, err := a.eventsTable.SelectNIDsByIDs(txn, eventIDs)
if err != nil {
return fmt.Errorf("failed to select NIDs for inserted events: %w", err)
}
if len(idToNIDs) != len(eventIDs) {
return fmt.Errorf("missing events just inserted, asked for %v got %v", eventIDs, idToNIDs)
}
memberNIDs := make([]int64, 0, len(idToNIDs))
otherNIDs := make([]int64, 0, len(idToNIDs))
for evID, nid := range idToNIDs {
memberNIDs := make([]int64, 0, len(eventIDToNID))
otherNIDs := make([]int64, 0, len(eventIDToNID))
for evID, nid := range eventIDToNID {
if _, exists := membershipEventIDs[evID]; exists {
memberNIDs = append(memberNIDs, nid)
memberNIDs = append(memberNIDs, int64(nid))
} else {
otherNIDs = append(otherNIDs, nid)
otherNIDs = append(otherNIDs, int64(nid))
}
}

Expand All @@ -219,7 +249,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
if err != nil {
return fmt.Errorf("failed to insert snapshot: %w", err)
}
addedEvents = true
res.AddedEvents = true
latestNID := int64(0)
for _, nid := range otherNIDs {
if nid > latestNID {
Expand All @@ -244,10 +274,10 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
// will have an associated state snapshot ID on the event.

// Set the snapshot ID as the current state
snapID = snapshot.SnapshotID
res.SnapshotID = snapshot.SnapshotID
return a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID)
})
return addedEvents, snapID, err
return res, err
}

// Accumulate internal state from a user's sync response. The timeline order MUST be in the order
Expand Down
22 changes: 11 additions & 11 deletions state/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ func TestAccumulatorInitialise(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
added, initSnapID, err := accumulator.Initialise(roomID, roomEvents)
res, err := accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("falied to Initialise accumulator: %s", err)
}
if !added {
if !res.AddedEvents {
t.Fatalf("didn't add events, wanted it to")
}

Expand All @@ -45,8 +45,8 @@ func TestAccumulatorInitialise(t *testing.T) {
if snapID == 0 {
t.Fatalf("Initialise did not store a current snapshot")
}
if snapID != initSnapID {
t.Fatalf("Initialise returned wrong snapshot ID, got %v want %v", initSnapID, snapID)
if snapID != res.SnapshotID {
t.Fatalf("Initialise returned wrong snapshot ID, got %v want %v", res.SnapshotID, snapID)
}

// this snapshot should have 1 member event and 2 other events in it
Expand Down Expand Up @@ -80,11 +80,11 @@ func TestAccumulatorInitialise(t *testing.T) {
}

// Subsequent calls do nothing and are not an error
added, _, err = accumulator.Initialise(roomID, roomEvents)
res, err = accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("falied to Initialise accumulator: %s", err)
}
if added {
if res.AddedEvents {
t.Fatalf("added events when it shouldn't have")
}
}
Expand All @@ -99,7 +99,7 @@ func TestAccumulatorAccumulate(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, roomEvents)
_, err := accumulator.Initialise(roomID, roomEvents)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestAccumulatorDelta(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, nil)
_, err := accumulator.Initialise(roomID, nil)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestAccumulatorMembershipLogs(t *testing.T) {
db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
_, _, err := accumulator.Initialise(roomID, nil)
_, err := accumulator.Initialise(roomID, nil)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestAccumulatorDupeEvents(t *testing.T) {
defer close()
accumulator := NewAccumulator(db)
roomID := "!buggy:localhost"
_, _, err := accumulator.Initialise(roomID, joinRoom.State.Events)
_, err := accumulator.Initialise(roomID, joinRoom.State.Events)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestAccumulatorMisorderedGraceful(t *testing.T) {
accumulator := NewAccumulator(db)
roomID := "!TestAccumulatorStateReset:localhost"
// Create a room with initial state A,C
_, _, err := accumulator.Initialise(roomID, []json.RawMessage{
_, err := accumulator.Initialise(roomID, []json.RawMessage{
eventA, eventC,
})
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions state/event_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func (t *EventTable) SelectHighestNID() (highest int64, err error) {
}

// Insert events into the event table. Returns a map of event ID to NID for new events only.
// The NIDs assigned to new events will respect the order of the given events, e.g. if
// we insert new events A and B in that order, then NID(A) < NID(B).
func (t *EventTable) Insert(txn *sqlx.Tx, events []Event, checkFields bool) (map[string]int, error) {
if checkFields {
ensureFieldsSet(events)
Expand Down Expand Up @@ -211,6 +213,10 @@ func (t *EventTable) SelectByNIDs(txn *sqlx.Tx, verifyAll bool, nids []int64) (e
WHERE event_nid = ANY ($1) ORDER BY event_nid ASC;`, pq.Int64Array(nids))
}

// SelectByIDs fetches all events with the given event IDs from the DB as Event structs.
// If verifyAll is true, the function will check that each event ID has a matching
// event row in the database. The returned events are ordered by ascending NID; the
// order of the event IDs is irrelevant.
func (t *EventTable) SelectByIDs(txn *sqlx.Tx, verifyAll bool, ids []string) (events []Event, err error) {
wanted := 0
if verifyAll {
Expand Down Expand Up @@ -259,6 +265,29 @@ func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids

}

// SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB.
// It MUST be called within a transaction, or else will panic.
func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) (map[string]struct{}, error) {
// Note: in practice, the order of rows returned matches the order of rows of
// array entries. But I don't think that's guaranteed. Return an (unordered) set
// out of paranoia.
queryStr := `
WITH maybe_unknown_events(event_id) AS (SELECT unnest($1::text[]))
SELECT event_id
FROM maybe_unknown_events LEFT JOIN syncv3_events USING(event_id)
WHERE event_nid IS NULL;`
Comment on lines +275 to +278
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did test this in a postgres playground, but I think this might be worth a unittest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do. There is event_table_test.go to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use this style of query anywhere in the proxy atm. I'm guessing you do this so you can "select then negate" all on the DB side? I'd expected to see something more like:

SELECT event_id FROM syncv3_events WHERE room_id=$1 AND event_id=ANY($2)

but then you'd need to negate on the Go side to find the missing ones. I wonder which is better.

Copy link
Contributor Author

@DMRobertson DMRobertson Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the thinking was basically to get the DB to tell me which events are unknown, rather than getting it to tell me which events are known and then computing the complement. (I'm assuming that we know about most state events, so we may as well get the DB to transmit the smaller set back to us.)

I don't think it makes a huge difference if the state block and room are small. I'm slightly worried about hitting this query for Matrix HQ with state block from a new poller's initial sync.


var unknownEventIDs []string
if err := txn.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs)); err != nil {
return nil, err
}
unknownMap := make(map[string]struct{}, len(unknownEventIDs))
for _, eventID := range unknownEventIDs {
unknownMap[eventID] = struct{}{}
}
return unknownMap, nil
}

// UpdateBeforeSnapshotID sets the before_state_snapshot_id field to `snapID` for the given NIDs.
func (t *EventTable) UpdateBeforeSnapshotID(txn *sqlx.Tx, eventNID, snapID, replacesNID int64) error {
_, err := txn.Exec(
Expand Down
76 changes: 76 additions & 0 deletions state/event_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,3 +870,79 @@ func TestRemoveUnsignedTXNID(t *testing.T) {
}
}
}

func TestEventTableSelectUnknownEventIDs(t *testing.T) {
db, close := connectToDB(t)
defer close()
txn, err := db.Beginx()
if err != nil {
t.Fatalf("failed to start txn: %s", err)
}
defer txn.Rollback()
const roomID = "!1:localhost"

// Note: there shouldn't be any other events with these IDs inserted before this
// transaction. $A and $B seem to be inserted and commit in TestEventTablePrevBatch.
const eventID1 = "$A-SelectUnknownEventIDs"
const eventID2 = "$B-SelectUnknownEventIDs"

knownEvents := []Event{
{
Type: "m.room.create",
StateKey: "",
IsState: true,
ID: eventID1,
RoomID: roomID,
},
{
Type: "m.room.name",
StateKey: "",
IsState: true,
ID: eventID2,
RoomID: roomID,
},
}
table := NewEventTable(db)

// Check the event IDs haven't been added by another test.
gotEvents, err := table.SelectByIDs(txn, true, []string{eventID1, eventID2})
if len(gotEvents) > 0 {
t.Fatalf("Event IDs already in use---commited by another test?")
}

// Insert the events
_, err = table.Insert(txn, knownEvents, false)
if err != nil {
t.Fatalf("failed to insert event: %s", err)
}

gotEvents, err = table.SelectByIDs(txn, true, []string{eventID1, eventID2})
if err != nil {
t.Fatalf("failed to select events: %s", err)
}
if gotEvents[0].ID == eventID1 && gotEvents[1].ID == eventID2 {
t.Logf("Got expected event IDs after insert. NIDS: %s=%d, %s=%d", gotEvents[0].ID, gotEvents[0].NID, gotEvents[1].ID, gotEvents[1].NID)
} else {
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("Event ID mismatch: expected $A-SelectUnknownEventIDs and $B-SelectUnknownEventIDs, got %v", gotEvents)
}

// Someone else tells us the state of the room is {A, C}. Query which of those
// event IDs are unknown.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
shouldBeUnknownIDs := []string{"$C-SelectUnknownEventIDs", "$D-SelectUnknownEventIDs"}
stateBlockIDs := append(shouldBeUnknownIDs, eventID1)
unknownIDs, err := table.SelectUnknownEventIDs(txn, stateBlockIDs)
t.Logf("unknownIDs=%v", unknownIDs)
if err != nil {
t.Fatalf("failed to select unknown state events: %s", err)
}

// Event C and D should be flagged as unknown.
if len(unknownIDs) != len(shouldBeUnknownIDs) {
t.Fatalf("Expected %d unknown ids, got %v", len(shouldBeUnknownIDs), unknownIDs)
}
for _, unknownEventID := range shouldBeUnknownIDs {
if _, ok := unknownIDs[unknownEventID]; !ok {
t.Errorf("Expected %s to be unknown to the DB, but it wasn't", unknownEventID)
}
}
}
2 changes: 1 addition & 1 deletion state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (s *Storage) Accumulate(roomID, prevBatch string, timeline []json.RawMessag
return s.accumulator.Accumulate(roomID, prevBatch, timeline)
}

func (s *Storage) Initialise(roomID string, state []json.RawMessage) (bool, int64, error) {
func (s *Storage) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) {
return s.accumulator.Initialise(roomID, state)
}

Expand Down
6 changes: 3 additions & 3 deletions state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
},
}
for roomID, eventMap := range roomIDToEventMap {
_, _, err := store.Initialise(roomID, eventMap)
_, err := store.Initialise(roomID, eventMap)
if err != nil {
t.Fatalf("Initialise on %s failed: %s", roomID, err)
}
Expand Down Expand Up @@ -512,7 +512,7 @@ func TestStorageLatestEventsInRoomsPrevBatch(t *testing.T) {
},
}

_, _, err := store.Initialise(roomID, stateEvents)
_, err := store.Initialise(roomID, stateEvents)
if err != nil {
t.Fatalf("failed to initialise: %s", err)
}
Expand Down Expand Up @@ -616,7 +616,7 @@ func TestGlobalSnapshot(t *testing.T) {
store := NewStorage(postgresConnectionString)
defer store.Teardown()
for roomID, stateEvents := range roomIDToEventMap {
_, _, err := store.Initialise(roomID, stateEvents)
_, err := store.Initialise(roomID, stateEvents)
assertNoError(t, err)
}
snapshot, err := store.GlobalSnapshot()
Expand Down
Loading