Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Retry on PartialStateConflictError
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Velten committed Dec 9, 2022
1 parent e28e4a7 commit 78bdd06
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 76 deletions.
41 changes: 15 additions & 26 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,34 +1417,23 @@ async def handle_new_client_event(
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
event, context = events_and_context[0]
try:
result, _ = await make_deferred_yieldable(
gather_results(
(
run_in_background(
self._persist_events,
requester=requester,
events_and_context=events_and_context,
ratelimit=ratelimit,
extra_users=extra_users,
),
run_in_background(
self.cache_joined_hosts_for_events, events_and_context
).addErrback(
log_failure, "cache_joined_hosts_for_event failed"
),
result, _ = await make_deferred_yieldable(
gather_results(
(
run_in_background(
self._persist_events,
requester=requester,
events_and_context=events_and_context,
ratelimit=ratelimit,
extra_users=extra_users,
),
consumeErrors=True,
)
).addErrback(unwrapFirstError)
except PartialStateConflictError as e:
# The event context needs to be recomputed.
# Turn the error into a 429, as a hint to the client to try again.
logger.info(
"Room %s was un-partial stated while persisting client event.",
event.room_id,
run_in_background(
self.cache_joined_hosts_for_events, events_and_context
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
),
consumeErrors=True,
)
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
).addErrback(unwrapFirstError)

return result

Expand Down
118 changes: 68 additions & 50 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.logging import opentracing
from synapse.module_api import NOT_SPAM
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.state import StateFilter
from synapse.types import (
JsonDict,
Expand Down Expand Up @@ -392,60 +393,77 @@ async def _local_membership_update(
event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream

event, context = await self.event_creation_handler.create_event(
requester,
{
"type": EventTypes.Member,
"content": content,
"room_id": room_id,
"sender": requester.user.to_string(),
"state_key": user_id,
# For backwards compatibility:
"membership": membership,
"origin_server_ts": origin_server_ts,
},
txn_id=txn_id,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
depth=depth,
require_consent=require_consent,
outlier=outlier,
historical=historical,
)

prev_state_ids = await context.get_prev_state_ids(
StateFilter.from_types([(EventTypes.Member, None)])
)
event_handled = False
while not event_handled:
try:
event, context = await self.event_creation_handler.create_event(
requester,
{
"type": EventTypes.Member,
"content": content,
"room_id": room_id,
"sender": requester.user.to_string(),
"state_key": user_id,
# For backwards compatibility:
"membership": membership,
"origin_server_ts": origin_server_ts,
},
txn_id=txn_id,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
depth=depth,
require_consent=require_consent,
outlier=outlier,
historical=historical,
)

prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
prev_state_ids = await context.get_prev_state_ids(
StateFilter.from_types([(EventTypes.Member, None)])
)

if event.membership == Membership.JOIN:
newly_joined = True
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN

# Only rate-limit if the user actually joined the room, otherwise we'll end
# up blocking profile updates.
if newly_joined and ratelimit:
await self._join_rate_limiter_local.ratelimit(requester)
await self._join_rate_per_room_limiter.ratelimit(
requester, key=room_id, update=False
prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, user_id), None
)
with opentracing.start_active_span("handle_new_client_event"):
result_event = await self.event_creation_handler.handle_new_client_event(
requester,
events_and_context=[(event, context)],
extra_users=[target],
ratelimit=ratelimit,
)

if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id)
if event.membership == Membership.JOIN:
newly_joined = True
if prev_member_event_id:
prev_member_event = await self.store.get_event(
prev_member_event_id
)
newly_joined = prev_member_event.membership != Membership.JOIN

# Only rate-limit if the user actually joined the room, otherwise we'll end
# up blocking profile updates.
if newly_joined and ratelimit:
await self._join_rate_limiter_local.ratelimit(requester)
await self._join_rate_per_room_limiter.ratelimit(
requester, key=room_id, update=False
)
with opentracing.start_active_span("handle_new_client_event"):
result_event = (
await self.event_creation_handler.handle_new_client_event(
requester,
events_and_context=[(event, context)],
extra_users=[target],
ratelimit=ratelimit,
)
)

if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(
prev_member_event_id
)
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id)

event_handled = True
except PartialStateConflictError as e:
# persisting couldn't happen because the room got un-partial in the meantime
# and context needs to be recomputed, so let's do so
pass

# we know it was persisted, so should have a stream ordering
assert result_event.internal_metadata.stream_ordering
Expand Down

0 comments on commit 78bdd06

Please sign in to comment.