Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2224 from matrix-org/erikj/prefill_state
Browse files Browse the repository at this point in the history
Prefill state caches
  • Loading branch information
erikjohnston authored May 16, 2017
2 parents d648f65 + 331570e commit b8492b6
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 9 deletions.
8 changes: 4 additions & 4 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ def __init__(self, txn, name, database_engine, after_callbacks):
object.__setattr__(self, "database_engine", database_engine)
object.__setattr__(self, "after_callbacks", after_callbacks)

def call_after(self, callback, *args):
def call_after(self, callback, *args, **kwargs):
"""Call the given callback on the main twisted thread after the
transaction has finished. Used to invalidate the caches on the
correct thread.
"""
self.after_callbacks.append((callback, args))
self.after_callbacks.append((callback, args, kwargs))

def __getattr__(self, name):
return getattr(self.txn, name)
Expand Down Expand Up @@ -319,8 +319,8 @@ def inner_func(conn, *args, **kwargs):
inner_func, *args, **kwargs
)
finally:
for after_callback, after_args in after_callbacks:
after_callback(*after_args)
for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
defer.returnValue(result)

@defer.inlineCallbacks
Expand Down
15 changes: 10 additions & 5 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ def _persist_events(self, events_and_contexts, backfilled=False,

event_counter.inc(event.type, origin_type, origin_entity)

for room_id, (_, _, new_state) in current_state_for_room.iteritems():
self.get_current_state_ids.prefill(
(room_id, ), new_state
)

@defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to
Expand Down Expand Up @@ -447,10 +452,10 @@ def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
Assumes that we are only persisting events for one room at a time.
Returns:
2-tuple (to_delete, to_insert) where both are state dicts, i.e.
(type, state_key) -> event_id. `to_delete` are the entries to
3-tuple (to_delete, to_insert, new_state) where both are state dicts,
i.e. (type, state_key) -> event_id. `to_delete` are the entries to
first be deleted from current_state_events, `to_insert` are entries
to insert.
to insert. `new_state` is the full set of state.
May return None if there are no changes to be applied.
"""
# Now we need to work out the different state sets for
Expand Down Expand Up @@ -557,7 +562,7 @@ def get_events(ev_ids):
if ev_id in events_to_insert
}

defer.returnValue((to_delete, to_insert))
defer.returnValue((to_delete, to_insert, current_state))

@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
Expand Down Expand Up @@ -710,7 +715,7 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled,

def _update_current_state_txn(self, txn, state_delta_by_room):
for room_id, current_state_tuple in state_delta_by_room.iteritems():
to_delete, to_insert = current_state_tuple
to_delete, to_insert, _ = current_state_tuple
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()],
Expand Down
12 changes: 12 additions & 0 deletions synapse/storage/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,18 @@ def _store_mult_state_groups_txn(self, txn, events_and_contexts):
],
)

# Prefill the state group cache with this group.
# It's fine to use the sequence like this as the state group map
# is immutable. (If the map wasn't immutable then this prefill could
# race with another update)
txn.call_after(
self._state_group_cache.update,
self._state_group_cache.sequence,
key=context.state_group,
value=dict(context.current_state_ids),
full=True,
)

self._simple_insert_many_txn(
txn,
table="event_to_state_groups",
Expand Down

0 comments on commit b8492b6

Please sign in to comment.