Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Reuse RoomShutdownHandler.shutdown_room
Browse files Browse the repository at this point in the history
  • Loading branch information
dklimpel committed Nov 2, 2021
1 parent 5b24839 commit 3f9a4f4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 154 deletions.
173 changes: 20 additions & 153 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

from twisted.python.failure import Failure

from synapse.api.constants import EventTypes, Membership, RoomCreationPreset
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, create_requester
from synapse.types import JsonDict, Requester
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -82,12 +82,6 @@ class PaginationHandler:
paginating during a purge.
"""

DEFAULT_MESSAGE = (
"Sharing illegal content on this server is not permitted and rooms in"
" violation will be blocked."
)
DEFAULT_ROOM_NAME = "Content Violation Notification"

def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
Expand All @@ -96,11 +90,7 @@ def __init__(self, hs: "HomeServer"):
self.state_store = self.storage.state
self.clock = hs.get_clock()
self._server_name = hs.hostname

self._room_member_handler = hs.get_room_member_handler()
self._room_creation_handler = hs.get_room_creation_handler()
self._replication = hs.get_replication_data_handler()
self._event_creation_handler = hs.get_event_creation_handler()
self._room_shutdown_handler = hs.get_room_shutdown_handler()

self.pagination_lock = ReadWriteLock()
self._purges_in_progress_by_room: Set[str] = set()
Expand Down Expand Up @@ -562,151 +552,25 @@ async def _shutdown_and_purge_room(
new_room_id: A string representing the room ID of the new room.
"""

if not new_room_name:
new_room_name = self.DEFAULT_ROOM_NAME
if not message:
message = self.DEFAULT_MESSAGE

self._purges_in_progress_by_room.add(room_id)
try:
with await self.pagination_lock.write(room_id):

# This will work even if the room is already blocked, but that is
# desirable in case the first attempt at blocking the room failed below.
if block:
await self.store.block_room(room_id, requester_user_id)

if new_room_user_id is not None:
room_creator_requester = create_requester(
new_room_user_id, authenticated_entity=requester_user_id
)

info, stream_id = await self._room_creation_handler.create_room(
room_creator_requester,
config={
"preset": RoomCreationPreset.PUBLIC_CHAT,
"name": new_room_name,
"power_level_content_override": {"users_default": -10},
},
ratelimit=False,
)
new_room_id = info["room_id"]

logger.info(
"[shutdown_and_purge] Shutting down room %r, joining to new room: %r",
room_id,
new_room_id,
)

# We now wait for the create room to come back in via replication so
# that we can assume that all the joins/invites have propagated before
# we try and auto join below.
await self._replication.wait_for_stream_position(
self.hs.config.worker.events_shard_config.get_instance(
new_room_id
),
"events",
stream_id,
)
else:
new_room_id = None
logger.info("[shutdown_and_purge] Shutting down room %r", room_id)

self._purges_by_id[room_id].status = PurgeStatus.STATUS_REMOVE_MEMBERS

users = await self.store.get_users_in_room(room_id)
kicked_users = []
failed_to_kick_users = []
for user_id in users:
if not self.hs.is_mine_id(user_id):
continue

logger.info(
"[shutdown_and_purge] Kicking %r from %r...", user_id, room_id
)

try:
# Kick users from room
target_requester = create_requester(
user_id, authenticated_entity=requester_user_id
)
_, stream_id = await self._room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=room_id,
action=Membership.LEAVE,
content={},
ratelimit=False,
require_consent=False,
)

# Wait for leave to come in over replication before trying to forget.
await self._replication.wait_for_stream_position(
self.hs.config.worker.events_shard_config.get_instance(
room_id
),
"events",
stream_id,
)

await self._room_member_handler.forget(
target_requester.user, room_id
)

# Join users to new room
if new_room_user_id:
await self._room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=new_room_id,
action=Membership.JOIN,
content={},
ratelimit=False,
require_consent=False,
)

kicked_users.append(user_id)
except Exception:
logger.exception(
"[shutdown_and_purge] Failed to leave old room and join new room for %r",
user_id,
)
failed_to_kick_users.append(user_id)

self._purges_by_id[
room_id
].result = await self._room_shutdown_handler.shutdown_room(
room_id=room_id,
requester_user_id=requester_user_id,
new_room_user_id=new_room_user_id,
new_room_name=new_room_name,
message=message,
block=block,
)
self._purges_by_id[room_id].status = PurgeStatus.STATUS_ACTIVE

# Send message in new room and move aliases
if new_room_user_id:
await self._event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)

aliases_for_room = await self.store.get_aliases_for_room(room_id)

await self.store.update_aliases_for_room(
room_id, new_room_id, requester_user_id
)
else:
aliases_for_room = []

self._purges_by_id[room_id].result = {
"kicked_users": kicked_users,
"failed_to_kick_users": failed_to_kick_users,
"local_aliases": aliases_for_room,
"new_room_id": new_room_id,
}

if purge:
logger.info(
"[shutdown_and_purge] starting purge room_id %s", room_id
)
logger.info("starting purge room_id %s", room_id)
self._purges_by_id[room_id].status = PurgeStatus.STATUS_ACTIVE

# first check that we have no users in this room
Expand All @@ -721,12 +585,12 @@ async def _shutdown_and_purge_room(

await self.storage.purge_events.purge_room(room_id)

logger.info("[shutdown_and_purge] complete")
logger.info("complete")
self._purges_by_id[room_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
f = Failure()
logger.error(
"[shutdown_and_purge] failed",
"failed",
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)
self._purges_by_id[room_id].status = PurgeStatus.STATUS_FAILED
Expand Down Expand Up @@ -785,6 +649,9 @@ def start_shutdown_and_purge_room(
400, "History purge already in progress for %s" % (room_id,)
)

# This check is double to `RoomShutdownHandler.shutdown_room`
# But here the requester get a direct response / error with HTTP request
# and do not have to check the purge status
if new_room_user_id is not None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
Expand All @@ -793,7 +660,7 @@ def start_shutdown_and_purge_room(

# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[shutdown_and_purge] starting shutdown room_id %s", room_id)
logger.info("[_shutdown_and_purge_room] starting shutdown room_id %s", room_id)

self._purges_by_id[room_id] = PurgeStatus()
run_as_background_process(
Expand Down
4 changes: 3 additions & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,9 @@ def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
class RoomShutdownHandler:
"""This handles synchronous room shutdowns and is part of the delete room v1 API.
It will become deprecated in the future.
The handler for asynchronous shudowns is part of the PaginationHandler.
The handler for asynchronous shudowns is part of the `PaginationHandler`.
If this handler is removed, `shutdown_room` must to be migrated to `PaginationHandler`
"""

DEFAULT_MESSAGE = (
Expand Down

0 comments on commit 3f9a4f4

Please sign in to comment.