Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 68db233

Browse files
authored
Handle race between persisting an event and un-partial stating a room (#13100)
Whenever we want to persist an event, we first compute an event context, which includes the state at the event and a flag indicating whether the state is partial. After a lot of processing, we finally try to store the event in the database, which can fail for partial state events when the containing room has been un-partial stated in the meantime. We detect the race as a foreign key constraint failure in the data store layer and turn it into a special `PartialStateConflictError` exception, which makes its way up to the method in which we computed the event context. To make things difficult, the exception needs to cross a replication request: `/fed_send_events` for events coming over federation and `/send_event` for events from clients. We transport the `PartialStateConflictError` as a `409 Conflict` over replication and turn `409`s back into `PartialStateConflictError`s on the worker making the request. All client events go through `EventCreationHandler.handle_new_client_event`, which is called in *a lot* of places. Instead of trying to update all the code which creates client events, we turn the `PartialStateConflictError` into a `429 Too Many Requests` in `EventCreationHandler.handle_new_client_event` and hope that clients take it as a hint to retry their request. On the federation event side, there are 7 places which compute event contexts. 4 of them use outlier event contexts: `FederationEventHandler._auth_and_persist_outliers_inner`, `FederationHandler.do_knock`, `FederationHandler.on_invite_request` and `FederationHandler.do_remotely_reject_invite`. These events won't have the partial state flag, so we do not need to do anything for then. The remaining 3 paths which create events are `FederationEventHandler.process_remote_join`, `FederationEventHandler.on_send_membership_event` and `FederationEventHandler._process_received_pdu`. We can't experience the race in `process_remote_join`, unless we're handling an additional join into a partial state room, which currently blocks, so we make no attempt to handle it correctly. `on_send_membership_event` is only called by `FederationServer._on_send_membership_event`, so we catch the `PartialStateConflictError` there and retry just once. `_process_received_pdu` is called by `on_receive_pdu` for incoming events and `_process_pulled_event` for backfill. The latter should never try to persist partial state events, so we ignore it. We catch the `PartialStateConflictError` in `on_receive_pdu` and retry just once. Refering to the graph of code paths in #12988 (comment) may make the above make more sense. Signed-off-by: Sean Quah <[email protected]>
1 parent 6ba732f commit 68db233

File tree

10 files changed

+234
-74
lines changed

10 files changed

+234
-74
lines changed

changelog.d/13100.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Faster room joins: Handle race between persisting an event and un-partial stating a room.

synapse/federation/federation_server.py

+15-3
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
ReplicationFederationSendEduRestServlet,
6868
ReplicationGetQueryRestServlet,
6969
)
70+
from synapse.storage.databases.main.events import PartialStateConflictError
7071
from synapse.storage.databases.main.lock import Lock
7172
from synapse.types import JsonDict, StateMap, get_domain_from_id
7273
from synapse.util import json_decoder, unwrapFirstError
@@ -882,9 +883,20 @@ async def _on_send_membership_event(
882883
logger.warning("%s", errmsg)
883884
raise SynapseError(403, errmsg, Codes.FORBIDDEN)
884885

885-
return await self._federation_event_handler.on_send_membership_event(
886-
origin, event
887-
)
886+
try:
887+
return await self._federation_event_handler.on_send_membership_event(
888+
origin, event
889+
)
890+
except PartialStateConflictError:
891+
# The room was un-partial stated while we were persisting the event.
892+
# Try once more, with full state this time.
893+
logger.info(
894+
"Room %s was un-partial stated during `on_send_membership_event`, trying again.",
895+
room_id,
896+
)
897+
return await self._federation_event_handler.on_send_membership_event(
898+
origin, event
899+
)
888900

889901
async def on_event_auth(
890902
self, origin: str, room_id: str, event_id: str

synapse/handlers/federation.py

+25-14
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
FederationDeniedError,
4646
FederationError,
4747
HttpResponseException,
48+
LimitExceededError,
4849
NotFoundError,
4950
RequestSendFailed,
5051
SynapseError,
@@ -64,6 +65,7 @@
6465
ReplicationCleanRoomRestServlet,
6566
ReplicationStoreRoomOnOutlierMembershipRestServlet,
6667
)
68+
from synapse.storage.databases.main.events import PartialStateConflictError
6769
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
6870
from synapse.storage.state import StateFilter
6971
from synapse.types import JsonDict, StateMap, get_domain_from_id
@@ -549,15 +551,29 @@ async def do_invite_join(
549551
# https://github.com/matrix-org/synapse/issues/12998
550552
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
551553

552-
max_stream_id = await self._federation_event_handler.process_remote_join(
553-
origin,
554-
room_id,
555-
auth_chain,
556-
state,
557-
event,
558-
room_version_obj,
559-
partial_state=ret.partial_state,
560-
)
554+
try:
555+
max_stream_id = (
556+
await self._federation_event_handler.process_remote_join(
557+
origin,
558+
room_id,
559+
auth_chain,
560+
state,
561+
event,
562+
room_version_obj,
563+
partial_state=ret.partial_state,
564+
)
565+
)
566+
except PartialStateConflictError as e:
567+
# The homeserver was already in the room and it is no longer partial
568+
# stated. We ought to be doing a local join instead. Turn the error into
569+
# a 429, as a hint to the client to try again.
570+
# TODO(faster_joins): `_should_perform_remote_join` suggests that we may
571+
# do a remote join for restricted rooms even if we have full state.
572+
logger.error(
573+
"Room %s was un-partial stated while processing remote join.",
574+
room_id,
575+
)
576+
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
561577

562578
if ret.partial_state:
563579
# Kick off the process of asynchronously fetching the state for this
@@ -1567,11 +1583,6 @@ async def _sync_partial_state_room(
15671583

15681584
# we raced against more events arriving with partial state. Go round
15691585
# the loop again. We've already logged a warning, so no need for more.
1570-
# TODO(faster_joins): there is still a race here, whereby incoming events which raced
1571-
# with us will fail to be persisted after the call to `clear_partial_state_room` due to
1572-
# having partial state.
1573-
# https://github.com/matrix-org/synapse/issues/12988
1574-
#
15751586
continue
15761587

15771588
events = await self.store.get_events_as_list(

synapse/handlers/federation_event.py

+41-10
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
ReplicationFederationSendEventsRestServlet,
6565
)
6666
from synapse.state import StateResolutionStore
67+
from synapse.storage.databases.main.events import PartialStateConflictError
6768
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
6869
from synapse.storage.state import StateFilter
6970
from synapse.types import (
@@ -275,7 +276,16 @@ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
275276
affected=pdu.event_id,
276277
)
277278

278-
await self._process_received_pdu(origin, pdu, state_ids=None)
279+
try:
280+
await self._process_received_pdu(origin, pdu, state_ids=None)
281+
except PartialStateConflictError:
282+
# The room was un-partial stated while we were processing the PDU.
283+
# Try once more, with full state this time.
284+
logger.info(
285+
"Room %s was un-partial stated while processing the PDU, trying again.",
286+
room_id,
287+
)
288+
await self._process_received_pdu(origin, pdu, state_ids=None)
279289

280290
async def on_send_membership_event(
281291
self, origin: str, event: EventBase
@@ -306,6 +316,9 @@ async def on_send_membership_event(
306316
307317
Raises:
308318
SynapseError if the event is not accepted into the room
319+
PartialStateConflictError if the room was un-partial stated in between
320+
computing the state at the event and persisting it. The caller should
321+
retry exactly once in this case.
309322
"""
310323
logger.debug(
311324
"on_send_membership_event: Got event: %s, signatures: %s",
@@ -423,6 +436,8 @@ async def process_remote_join(
423436
424437
Raises:
425438
SynapseError if the response is in some way invalid.
439+
PartialStateConflictError if the homeserver is already in the room and it
440+
has been un-partial stated.
426441
"""
427442
create_event = None
428443
for e in state:
@@ -1084,10 +1099,14 @@ async def _process_received_pdu(
10841099
10851100
state_ids: Normally None, but if we are handling a gap in the graph
10861101
(ie, we are missing one or more prev_events), the resolved state at the
1087-
event
1102+
event. Must not be partial state.
10881103
10891104
backfilled: True if this is part of a historical batch of events (inhibits
10901105
notification to clients, and validation of device keys.)
1106+
1107+
PartialStateConflictError: if the room was un-partial stated in between
1108+
computing the state at the event and persisting it. The caller should retry
1109+
exactly once in this case. Will never be raised if `state_ids` is provided.
10911110
"""
10921111
logger.debug("Processing event: %s", event)
10931112
assert not event.internal_metadata.outlier
@@ -1933,6 +1952,9 @@ async def _run_push_actions_and_persist_event(
19331952
event: The event itself.
19341953
context: The event context.
19351954
backfilled: True if the event was backfilled.
1955+
1956+
PartialStateConflictError: if attempting to persist a partial state event in
1957+
a room that has been un-partial stated.
19361958
"""
19371959
# this method should not be called on outliers (those code paths call
19381960
# persist_events_and_notify directly.)
@@ -1985,6 +2007,10 @@ async def persist_events_and_notify(
19852007
19862008
Returns:
19872009
The stream ID after which all events have been persisted.
2010+
2011+
Raises:
2012+
PartialStateConflictError: if attempting to persist a partial state event in
2013+
a room that has been un-partial stated.
19882014
"""
19892015
if not event_and_contexts:
19902016
return self._store.get_room_max_stream_ordering()
@@ -1993,14 +2019,19 @@ async def persist_events_and_notify(
19932019
if instance != self._instance_name:
19942020
# Limit the number of events sent over replication. We choose 200
19952021
# here as that is what we default to in `max_request_body_size(..)`
1996-
for batch in batch_iter(event_and_contexts, 200):
1997-
result = await self._send_events(
1998-
instance_name=instance,
1999-
store=self._store,
2000-
room_id=room_id,
2001-
event_and_contexts=batch,
2002-
backfilled=backfilled,
2003-
)
2022+
try:
2023+
for batch in batch_iter(event_and_contexts, 200):
2024+
result = await self._send_events(
2025+
instance_name=instance,
2026+
store=self._store,
2027+
room_id=room_id,
2028+
event_and_contexts=batch,
2029+
backfilled=backfilled,
2030+
)
2031+
except SynapseError as e:
2032+
if e.code == HTTPStatus.CONFLICT:
2033+
raise PartialStateConflictError()
2034+
raise
20042035
return result["max_stream_id"]
20052036
else:
20062037
assert self._storage_controllers.persistence

synapse/handlers/message.py

+53-26
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
AuthError,
3838
Codes,
3939
ConsentNotGivenError,
40+
LimitExceededError,
4041
NotFoundError,
4142
ShadowBanError,
4243
SynapseError,
@@ -53,6 +54,7 @@
5354
from synapse.logging.context import make_deferred_yieldable, run_in_background
5455
from synapse.metrics.background_process_metrics import run_as_background_process
5556
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
57+
from synapse.storage.databases.main.events import PartialStateConflictError
5658
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
5759
from synapse.storage.state import StateFilter
5860
from synapse.types import (
@@ -1250,6 +1252,8 @@ async def handle_new_client_event(
12501252
12511253
Raises:
12521254
ShadowBanError if the requester has been shadow-banned.
1255+
SynapseError(503) if attempting to persist a partial state event in
1256+
a room that has been un-partial stated.
12531257
"""
12541258
extra_users = extra_users or []
12551259

@@ -1300,24 +1304,35 @@ async def handle_new_client_event(
13001304

13011305
# We now persist the event (and update the cache in parallel, since we
13021306
# don't want to block on it).
1303-
result, _ = await make_deferred_yieldable(
1304-
gather_results(
1305-
(
1306-
run_in_background(
1307-
self._persist_event,
1308-
requester=requester,
1309-
event=event,
1310-
context=context,
1311-
ratelimit=ratelimit,
1312-
extra_users=extra_users,
1307+
try:
1308+
result, _ = await make_deferred_yieldable(
1309+
gather_results(
1310+
(
1311+
run_in_background(
1312+
self._persist_event,
1313+
requester=requester,
1314+
event=event,
1315+
context=context,
1316+
ratelimit=ratelimit,
1317+
extra_users=extra_users,
1318+
),
1319+
run_in_background(
1320+
self.cache_joined_hosts_for_event, event, context
1321+
).addErrback(
1322+
log_failure, "cache_joined_hosts_for_event failed"
1323+
),
13131324
),
1314-
run_in_background(
1315-
self.cache_joined_hosts_for_event, event, context
1316-
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
1317-
),
1318-
consumeErrors=True,
1325+
consumeErrors=True,
1326+
)
1327+
).addErrback(unwrapFirstError)
1328+
except PartialStateConflictError as e:
1329+
# The event context needs to be recomputed.
1330+
# Turn the error into a 429, as a hint to the client to try again.
1331+
logger.info(
1332+
"Room %s was un-partial stated while persisting client event.",
1333+
event.room_id,
13191334
)
1320-
).addErrback(unwrapFirstError)
1335+
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
13211336

13221337
return result
13231338

@@ -1332,6 +1347,9 @@ async def _persist_event(
13321347
"""Actually persists the event. Should only be called by
13331348
`handle_new_client_event`, and see its docstring for documentation of
13341349
the arguments.
1350+
1351+
PartialStateConflictError: if attempting to persist a partial state event in
1352+
a room that has been un-partial stated.
13351353
"""
13361354

13371355
# Skip push notification actions for historical messages
@@ -1348,16 +1366,21 @@ async def _persist_event(
13481366
# If we're a worker we need to hit out to the master.
13491367
writer_instance = self._events_shard_config.get_instance(event.room_id)
13501368
if writer_instance != self._instance_name:
1351-
result = await self.send_event(
1352-
instance_name=writer_instance,
1353-
event_id=event.event_id,
1354-
store=self.store,
1355-
requester=requester,
1356-
event=event,
1357-
context=context,
1358-
ratelimit=ratelimit,
1359-
extra_users=extra_users,
1360-
)
1369+
try:
1370+
result = await self.send_event(
1371+
instance_name=writer_instance,
1372+
event_id=event.event_id,
1373+
store=self.store,
1374+
requester=requester,
1375+
event=event,
1376+
context=context,
1377+
ratelimit=ratelimit,
1378+
extra_users=extra_users,
1379+
)
1380+
except SynapseError as e:
1381+
if e.code == HTTPStatus.CONFLICT:
1382+
raise PartialStateConflictError()
1383+
raise
13611384
stream_id = result["stream_id"]
13621385
event_id = result["event_id"]
13631386
if event_id != event.event_id:
@@ -1485,6 +1508,10 @@ async def persist_and_notify_client_event(
14851508
The persisted event. This may be different than the given event if
14861509
it was de-duplicated (e.g. because we had already persisted an
14871510
event with the same transaction ID.)
1511+
1512+
Raises:
1513+
PartialStateConflictError: if attempting to persist a partial state event in
1514+
a room that has been un-partial stated.
14881515
"""
14891516
extra_users = extra_users or []
14901517

synapse/replication/http/federation.py

+3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
6060
{
6161
"max_stream_id": 32443,
6262
}
63+
64+
Responds with a 409 when a `PartialStateConflictError` is raised due to an event
65+
context that needs to be recomputed due to the un-partial stating of a room.
6366
"""
6467

6568
NAME = "fed_send_events"

synapse/replication/http/send_event.py

+3
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
5959
6060
{ "stream_id": 12345, "event_id": "$abcdef..." }
6161
62+
Responds with a 409 when a `PartialStateConflictError` is raised due to an event
63+
context that needs to be recomputed due to the un-partial stating of a room.
64+
6265
The returned event ID may not match the sent event if it was deduplicated.
6366
"""
6467

synapse/storage/controllers/persist_events.py

+12
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,10 @@ async def persist_events(
315315
if they were deduplicated due to an event already existing that
316316
matched the transaction ID; the existing event is returned in such
317317
a case.
318+
319+
Raises:
320+
PartialStateConflictError: if attempting to persist a partial state event in
321+
a room that has been un-partial stated.
318322
"""
319323
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
320324
for event, ctx in events_and_contexts:
@@ -363,6 +367,10 @@ async def persist_event(
363367
latest persisted event. The returned event may not match the given
364368
event if it was deduplicated due to an existing event matching the
365369
transaction ID.
370+
371+
Raises:
372+
PartialStateConflictError: if attempting to persist a partial state event in
373+
a room that has been un-partial stated.
366374
"""
367375
# add_to_queue returns a map from event ID to existing event ID if the
368376
# event was deduplicated. (The dict may also include other entries if
@@ -453,6 +461,10 @@ async def _persist_event_batch(
453461
Returns:
454462
A dictionary of event ID to event ID we didn't persist as we already
455463
had another event persisted with the same TXN ID.
464+
465+
Raises:
466+
PartialStateConflictError: if attempting to persist a partial state event in
467+
a room that has been un-partial stated.
456468
"""
457469
replaced_events: Dict[str, str] = {}
458470
if not events_and_contexts:

0 commit comments

Comments
 (0)