Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for moving /push_rules off of main process #17037

Merged
merged 5 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17037.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for moving `/push_rules` off of main process.
7 changes: 7 additions & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,13 @@ the stream writer for the `presence` stream:

^/_matrix/client/(api/v1|r0|v3|unstable)/presence/

##### The `push` stream

The following endpoints should be routed directly to the worker configured as
the stream writer for the `push` stream:

^/_matrix/client/(api/v1|r0|v3|unstable)/push_rules/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WAIT it's actually /pushrules/

Suggested change
^/_matrix/client/(api/v1|r0|v3|unstable)/push_rules/
^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good spot


#### Restrict outbound federation traffic to a specific set of workers

The
Expand Down
9 changes: 3 additions & 6 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
)
from synapse.notifier import ReplicationNotifier
from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn
from synapse.storage.databases.main import FilteringWorkerStore, PushRuleStore
from synapse.storage.databases.main import FilteringWorkerStore
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
Expand All @@ -77,10 +77,8 @@
)
from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
from synapse.storage.databases.main.profile import ProfileWorkerStore
from synapse.storage.databases.main.pusher import (
PusherBackgroundUpdatesStore,
PusherWorkerStore,
)
from synapse.storage.databases.main.push_rule import PusherWorkerStore
from synapse.storage.databases.main.pusher import PusherBackgroundUpdatesStore
from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
Expand Down Expand Up @@ -245,7 +243,6 @@ class Store(
AccountDataWorkerStore,
FilteringWorkerStore,
ProfileWorkerStore,
PushRuleStore,
PusherWorkerStore,
PusherBackgroundUpdatesStore,
PresenceBackgroundUpdateStore,
Expand Down
12 changes: 12 additions & 0 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class WriterLocations:
can only be a single instance.
presence: The instances that write to the presence stream. Currently
can only be a single instance.
push: The instances that write to the push stream. Currently
can only be a single instance.
"""

events: List[str] = attr.ib(
Expand All @@ -182,6 +184,10 @@ class WriterLocations:
default=["master"],
converter=_instance_to_list_converter,
)
push: List[str] = attr.ib(
default=["master"],
converter=_instance_to_list_converter,
)
sandhose marked this conversation as resolved.
Show resolved Hide resolved


@attr.s(auto_attribs=True)
Expand Down Expand Up @@ -341,6 +347,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"account_data",
"receipts",
"presence",
"push",
):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
Expand Down Expand Up @@ -378,6 +385,11 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"Must only specify one instance to handle `presence` messages."
)

if len(self.writers.push) != 1:
raise ConfigError(
"Must only specify one instance to handle `push` messages."
)

self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events
)
Expand Down
19 changes: 16 additions & 3 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.push import ReplicationCopyPusherRestServlet
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import (
JsonDict,
Expand Down Expand Up @@ -181,6 +182,10 @@ def __init__(self, hs: "HomeServer"):
hs.config.server.forgotten_room_retention_period
)

self._is_push_writer = hs.get_instance_name() in hs.config.worker.writers.push
self._push_writer = hs.config.worker.writers.push[0]
self._copy_push_client = ReplicationCopyPusherRestServlet.make_client(hs)

def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
Expand Down Expand Up @@ -1301,9 +1306,17 @@ async def copy_user_state_on_room_upgrade(
old_room_id, new_room_id, user_id
)
# Copy over push rules
await self.store.copy_push_rules_from_room_to_room_for_user(
old_room_id, new_room_id, user_id
)
if self._is_push_writer:
await self.store.copy_push_rules_from_room_to_room_for_user(
old_room_id, new_room_id, user_id
)
else:
await self._copy_push_client(
instance_name=self._push_writer,
user_id=user_id,
old_room_id=old_room_id,
new_room_id=new_room_id,
)
except Exception:
logger.exception(
"Error copying tags and/or push rules from rooms %s to %s for user %s. "
Expand Down
41 changes: 41 additions & 0 deletions synapse/replication/http/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,46 @@ async def _handle_request( # type: ignore[override]
return 200, {}


class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
"""Copies push rules from an old room to new room.

Request format:

POST /_synapse/replication/copy_push_rules/:user_id/:old_room_id/:new_room_id

{}

"""

NAME = "copy_push_rules"
PATH_ARGS = ("user_id", "old_room_id", "new_room_id")
CACHE = False

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self._store = hs.get_datastores().main

@staticmethod
async def _serialize_payload(user_id: str, old_room_id: str, new_room_id: str) -> JsonDict: # type: ignore[override]
return {}

async def _handle_request( # type: ignore[override]
self,
request: Request,
content: JsonDict,
user_id: str,
old_room_id: str,
new_room_id: str,
) -> Tuple[int, JsonDict]:

await self._store.copy_push_rules_from_room_to_room_for_user(
old_room_id, new_room_id, user_id
)

return 200, {}


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationRemovePusherRestServlet(hs).register(http_server)
ReplicationCopyPusherRestServlet(hs).register(http_server)
7 changes: 7 additions & 0 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
FederationStream,
PresenceFederationStream,
PresenceStream,
PushRulesStream,
ReceiptsStream,
Stream,
ToDeviceStream,
Expand Down Expand Up @@ -178,6 +179,12 @@ def __init__(self, hs: "HomeServer"):

continue

if isinstance(stream, PushRulesStream):
if hs.get_instance_name() in hs.config.worker.writers.push:
self._streams_to_replicate.append(stream)

continue

# Only add any other streams if we're on master.
if hs.config.worker.worker_app is not None:
continue
Expand Down
6 changes: 3 additions & 3 deletions synapse/rest/client/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.notifier = hs.get_notifier()
self._is_worker = hs.config.worker.worker_app is not None
self._is_push_worker = hs.get_instance_name() in hs.config.worker.writers.push
self._push_rules_handler = hs.get_push_rules_handler()
self._push_rule_linearizer = Linearizer(name="push_rules")

async def on_PUT(self, request: SynapseRequest, path: str) -> Tuple[int, JsonDict]:
if self._is_worker:
if not self._is_push_worker:
raise Exception("Cannot handle PUT /push_rules on worker")

requester = await self.auth.get_user_by_req(request)
Expand Down Expand Up @@ -137,7 +137,7 @@ async def handle_put(
async def on_DELETE(
self, request: SynapseRequest, path: str
) -> Tuple[int, JsonDict]:
if self._is_worker:
if not self._is_push_worker:
raise Exception("Cannot handle DELETE /push_rules on worker")

requester = await self.auth.get_user_by_req(request)
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
from .presence import PresenceStore
from .profile import ProfileStore
from .purge_events import PurgeEventsStore
from .push_rule import PushRuleStore
from .push_rule import PushRulesWorkerStore
from .pusher import PusherStore
from .receipts import ReceiptsStore
from .registration import RegistrationStore
Expand Down Expand Up @@ -130,7 +130,6 @@ class DataStore(
RejectionsStore,
FilteringWorkerStore,
PusherStore,
PushRuleStore,
ApplicationServiceTransactionStore,
EventPushActionsStore,
ServerMetricsStore,
Expand All @@ -140,6 +139,7 @@ class DataStore(
SearchStore,
TagsStore,
AccountDataStore,
PushRulesWorkerStore,
StreamWorkerStore,
OpenIdStore,
ClientIpWorkerStore,
Expand Down
Loading
Loading