Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add a new module API to update user presence state. (#16544)
Browse files Browse the repository at this point in the history
This adds a module API which allows a module to update a user's
presence state/status message. This is useful for controlling presence
from an external system.

To fully control presence from the module the presence.enabled config
parameter gains a new state of "untracked" which disables internal tracking
of presence changes via user actions, etc. Only updates from the module will
be persisted and sent down sync properly).
  • Loading branch information
clokep authored Oct 26, 2023
1 parent 9407d5b commit 85e5f2d
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 53 deletions.
1 change: 1 addition & 0 deletions changelog.d/16544.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a new module API for controller presence.
7 changes: 7 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ Example configuration:
presence:
enabled: false
```

`enabled` can also be set to a special value of "untracked" which ignores updates
received via clients and federation, while still accepting updates from the
[module API](../../modules/index.md).

*The "untracked" option was added in Synapse 1.96.0.*

---
### `require_auth_for_profile_requests`

Expand Down
11 changes: 8 additions & 3 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,14 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:

# Whether to enable user presence.
presence_config = config.get("presence") or {}
self.use_presence = presence_config.get("enabled")
if self.use_presence is None:
self.use_presence = config.get("use_presence", True)
presence_enabled = presence_config.get("enabled")
if presence_enabled is None:
presence_enabled = config.get("use_presence", True)

# Whether presence is enabled *at all*.
self.presence_enabled = bool(presence_enabled)
# Whether to internally track presence, requires that presence is enabled,
self.track_presence = self.presence_enabled and presence_enabled != "untracked"

# Custom presence router module
# This is the legacy way of configuring it (the config should now be put in the modules section)
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ def register_instances_for_edu(
self._edu_type_to_instance[edu_type] = instance_names

async def on_edu(self, edu_type: str, origin: str, content: dict) -> None:
if not self.config.server.use_presence and edu_type == EduTypes.PRESENCE:
if not self.config.server.track_presence and edu_type == EduTypes.PRESENCE:
return

# Check if we have a handler on this instance
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ async def send_presence_to_destinations(
destinations (list[str])
"""

if not states or not self.hs.config.server.use_presence:
if not states or not self.hs.config.server.track_presence:
# No-op if presence is disabled.
return

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ async def _room_initial_sync_joined(

async def get_presence() -> List[JsonDict]:
# If presence is disabled, return an empty list
if not self.hs.config.server.use_presence:
if not self.hs.config.server.presence_enabled:
return []

states = await presence_handler.get_states(
Expand Down
78 changes: 47 additions & 31 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ def __init__(self, hs: "HomeServer"):
self.state = hs.get_state_handler()
self.is_mine_id = hs.is_mine_id

self._presence_enabled = hs.config.server.use_presence
self._presence_enabled = hs.config.server.presence_enabled
self._track_presence = hs.config.server.track_presence

self._federation = None
if hs.should_send_federation():
Expand Down Expand Up @@ -512,7 +513,7 @@ def __init__(self, hs: "HomeServer"):
)

async def _on_shutdown(self) -> None:
if self._presence_enabled:
if self._track_presence:
self.hs.get_replication_command_handler().send_command(
ClearUserSyncsCommand(self.instance_id)
)
Expand All @@ -524,7 +525,7 @@ def send_user_sync(
is_syncing: bool,
last_sync_ms: int,
) -> None:
if self._presence_enabled:
if self._track_presence:
self.hs.get_replication_command_handler().send_user_sync(
self.instance_id, user_id, device_id, is_syncing, last_sync_ms
)
Expand Down Expand Up @@ -571,7 +572,7 @@ async def user_syncing(
Called by the sync and events servlets to record that a user has connected to
this worker and is waiting for some events.
"""
if not affect_presence or not self._presence_enabled:
if not affect_presence or not self._track_presence:
return _NullContextManager()

# Note that this causes last_active_ts to be incremented which is not
Expand Down Expand Up @@ -702,8 +703,8 @@ async def set_state(

user_id = target_user.to_string()

# If presence is disabled, no-op
if not self._presence_enabled:
# If tracking of presence is disabled, no-op
if not self._track_presence:
return

# Proxy request to instance that writes presence
Expand All @@ -723,7 +724,7 @@ async def bump_presence_active_time(
with the app.
"""
# If presence is disabled, no-op
if not self._presence_enabled:
if not self._track_presence:
return

# Proxy request to instance that writes presence
Expand Down Expand Up @@ -760,7 +761,7 @@ def __init__(self, hs: "HomeServer"):
] = {}

now = self.clock.time_msec()
if self._presence_enabled:
if self._track_presence:
for state in self.user_to_current_state.values():
# Create a psuedo-device to properly handle time outs. This will
# be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT.
Expand Down Expand Up @@ -831,14 +832,17 @@ def __init__(self, hs: "HomeServer"):

self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")

if self._presence_enabled:
if self._track_presence:
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
self.clock.call_later(
30, self.clock.looping_call, self._handle_timeouts, 5000
)

# Presence information is persisted, whether or not it is being tracked
# internally.
if self._presence_enabled:
self.clock.call_later(
60,
self.clock.looping_call,
Expand All @@ -854,7 +858,7 @@ def __init__(self, hs: "HomeServer"):
)

# Used to handle sending of presence to newly joined users/servers
if self._presence_enabled:
if self._track_presence:
self.notifier.add_replication_callback(self.notify_new_event)

# Presence is best effort and quickly heals itself, so lets just always
Expand Down Expand Up @@ -905,7 +909,9 @@ async def _persist_unpersisted_changes(self) -> None:
)

async def _update_states(
self, new_states: Iterable[UserPresenceState], force_notify: bool = False
self,
new_states: Iterable[UserPresenceState],
force_notify: bool = False,
) -> None:
"""Updates presence of users. Sets the appropriate timeouts. Pokes
the notifier and federation if and only if the changed presence state
Expand Down Expand Up @@ -943,7 +949,7 @@ async def _update_states(
for new_state in new_states:
user_id = new_state.user_id

# Its fine to not hit the database here, as the only thing not in
# It's fine to not hit the database here, as the only thing not in
# the current state cache are OFFLINE states, where the only field
# of interest is last_active which is safe enough to assume is 0
# here.
Expand All @@ -957,6 +963,9 @@ async def _update_states(
is_mine=self.is_mine_id(user_id),
wheel_timer=self.wheel_timer,
now=now,
# When overriding disabled presence, don't kick off all the
# wheel timers.
persist=not self._track_presence,
)

if force_notify:
Expand Down Expand Up @@ -1072,7 +1081,7 @@ async def bump_presence_active_time(
with the app.
"""
# If presence is disabled, no-op
if not self._presence_enabled:
if not self._track_presence:
return

user_id = user.to_string()
Expand Down Expand Up @@ -1124,7 +1133,7 @@ async def user_syncing(
client that is being used by a user.
presence_state: The presence state indicated in the sync request
"""
if not affect_presence or not self._presence_enabled:
if not affect_presence or not self._track_presence:
return _NullContextManager()

curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0)
Expand Down Expand Up @@ -1284,7 +1293,7 @@ async def _persist_and_notify(self, states: List[UserPresenceState]) -> None:

async def incoming_presence(self, origin: str, content: JsonDict) -> None:
"""Called when we receive a `m.presence` EDU from a remote server."""
if not self._presence_enabled:
if not self._track_presence:
return

now = self.clock.time_msec()
Expand Down Expand Up @@ -1359,7 +1368,7 @@ async def set_state(
raise SynapseError(400, "Invalid presence state")

# If presence is disabled, no-op
if not self._presence_enabled:
if not self._track_presence:
return

user_id = target_user.to_string()
Expand Down Expand Up @@ -2118,6 +2127,7 @@ def handle_update(
is_mine: bool,
wheel_timer: WheelTimer,
now: int,
persist: bool,
) -> Tuple[UserPresenceState, bool, bool]:
"""Given a presence update:
1. Add any appropriate timers.
Expand All @@ -2129,6 +2139,8 @@ def handle_update(
is_mine: Whether the user is ours
wheel_timer
now: Time now in ms
persist: True if this state should persist until another update occurs.
Skips insertion into wheel timers.
Returns:
3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
Expand All @@ -2146,14 +2158,15 @@ def handle_update(
if is_mine:
if new_state.state == PresenceState.ONLINE:
# Idle timer
wheel_timer.insert(
now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER
)
if not persist:
wheel_timer.insert(
now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER
)

active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
new_state = new_state.copy_and_replace(currently_active=active)

if active:
if active and not persist:
wheel_timer.insert(
now=now,
obj=user_id,
Expand All @@ -2162,31 +2175,34 @@ def handle_update(

if new_state.state != PresenceState.OFFLINE:
# User has stopped syncing
wheel_timer.insert(
now=now,
obj=user_id,
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
)
if not persist:
wheel_timer.insert(
now=now,
obj=user_id,
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
)

last_federate = new_state.last_federation_update_ts
if now - last_federate > FEDERATION_PING_INTERVAL:
# Been a while since we've poked remote servers
new_state = new_state.copy_and_replace(last_federation_update_ts=now)
federation_ping = True

if new_state.state == PresenceState.BUSY:
if new_state.state == PresenceState.BUSY and not persist:
wheel_timer.insert(
now=now,
obj=user_id,
then=new_state.last_user_sync_ts + BUSY_ONLINE_TIMEOUT,
)

else:
wheel_timer.insert(
now=now,
obj=user_id,
then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
)
# An update for a remote user was received.
if not persist:
wheel_timer.insert(
now=now,
obj=user_id,
then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
)

# Check whether the change was something worth notifying about
if should_notify(prev_state, new_state, is_mine):
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@ async def generate_sync_result(

# Presence data is included if the server has it enabled and not filtered out.
include_presence_data = bool(
self.hs_config.server.use_presence
self.hs_config.server.presence_enabled
and not sync_config.filter_collection.blocks_all_presence()
)
# Device list updates are sent if a since token is provided.
Expand Down
33 changes: 33 additions & 0 deletions synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Generator,
Iterable,
List,
Mapping,
Optional,
Tuple,
TypeVar,
Expand All @@ -39,6 +40,7 @@

from synapse.api import errors
from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState
from synapse.config import ConfigError
from synapse.events import EventBase
from synapse.events.presence_router import (
Expand Down Expand Up @@ -1184,6 +1186,37 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
presence_events, [destination]
)

async def set_presence_for_users(
self, users: Mapping[str, Tuple[str, Optional[str]]]
) -> None:
"""
Update the internal presence state of users.
This can be used for either local or remote users.
Note that this method can only be run on the process that is configured to write to the
presence stream. By default, this is the main process.
Added in Synapse v1.96.0.
"""

# We pull out the presence handler here to break a cyclic
# dependency between the presence router and module API.
presence_handler = self._hs.get_presence_handler()

from synapse.handlers.presence import PresenceHandler

assert isinstance(presence_handler, PresenceHandler)

states = await presence_handler.current_state_for_users(users.keys())
for user_id, (state, status_msg) in users.items():
prev_state = states.setdefault(user_id, UserPresenceState.default(user_id))
states[user_id] = prev_state.copy_and_replace(
state=state, status_msg=status_msg
)

await presence_handler._update_states(states.values(), force_notify=True)

def looping_background_call(
self,
f: Callable,
Expand Down
6 changes: 2 additions & 4 deletions synapse/rest/client/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.auth = hs.get_auth()

self._use_presence = hs.config.server.use_presence

async def on_GET(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
user = UserID.from_string(user_id)

if not self._use_presence:
if not self.hs.config.server.presence_enabled:
return 200, {"presence": "offline"}

if requester.user != user:
Expand Down Expand Up @@ -96,7 +94,7 @@ async def on_PUT(
except Exception:
raise SynapseError(400, "Unable to parse state")

if self._use_presence:
if self.hs.config.server.track_presence:
await self.presence_handler.set_state(user, requester.device_id, state)

return 200, {}
Expand Down
Loading

0 comments on commit 85e5f2d

Please sign in to comment.