Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve tracing for to device messages #9686

Merged
merged 7 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9686.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve Jaeger tracing for `to_device` messages.
8 changes: 8 additions & 0 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
Expand Down Expand Up @@ -557,6 +558,13 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]
contents, stream_id = await self._store.get_new_device_msgs_for_remote(
self._destination, last_device_stream_id, to_device_stream_id, limit
)
for content in contents:
message_id = content.get("message_id")
if not message_id:
continue

set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)

edus = [
Edu(
origin=self._server_name,
Expand Down
35 changes: 20 additions & 15 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
SynapseTags,
get_active_span_text_map,
log_kv,
set_tag,
start_active_span,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
Expand Down Expand Up @@ -183,7 +183,10 @@ async def send_device_message(
) -> None:
sender_user_id = requester.user.to_string()

set_tag("number_of_messages", len(messages))
message_id = random_string(16)
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)

log_kv({"number_of_to_device_messages": len(messages)})
set_tag("sender", sender_user_id)
local_messages = {}
remote_messages = {} # type: Dict[str, Dict[str, Dict[str, JsonDict]]]
Expand All @@ -205,32 +208,35 @@ async def send_device_message(
"content": message_content,
"type": message_type,
"sender": sender_user_id,
"message_id": message_id,
}
for device_id, message_content in by_device.items()
}
if messages_by_device:
local_messages[user_id] = messages_by_device
log_kv(
{
"user_id": user_id,
"device_id": list(messages_by_device),
}
)
else:
destination = get_domain_from_id(user_id)
remote_messages.setdefault(destination, {})[user_id] = by_device

message_id = random_string(16)

context = get_active_span_text_map()

remote_edu_contents = {}
for destination, messages in remote_messages.items():
with start_active_span("to_device_for_user"):
set_tag("destination", destination)
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"org.matrix.opentracing_context": json_encoder.encode(context),
}
log_kv({"destination": destination})
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"org.matrix.opentracing_context": json_encoder.encode(context),
}

log_kv({"local_messages": local_messages})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the new log_kv({user_id, device_id}) above is meant to be the replacement for this log_kv and the one below. Is that correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. This log was a bit useless as it gave the entire message, which then got truncated by jaeger, so you couldn't really see what was going on

stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
Expand All @@ -239,7 +245,6 @@ async def send_device_message(
"to_device_key", stream_id, users=local_messages.keys()
)

log_kv({"remote_messages": remote_messages})
if self.federation_sender:
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
Expand Down
18 changes: 17 additions & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.api.filtering import FilterCollection
from synapse.events import EventBase
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
Expand Down Expand Up @@ -340,7 +341,14 @@ async def current_sync_for_user(
full_state: bool = False,
) -> SyncResult:
"""Get the sync for client needed to match what the server has now."""
return await self.generate_sync_result(sync_config, since_token, full_state)
with start_active_span("current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)

set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result

async def push_rules_for_user(self, user: UserID) -> JsonDict:
user_id = user.to_string()
Expand Down Expand Up @@ -964,6 +972,7 @@ async def generate_sync_result(
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})

logger.debug(
"Calculating sync response for %r between %s and %s",
Expand Down Expand Up @@ -1225,6 +1234,13 @@ async def _generate_sync_entry_for_to_device(
user_id, device_id, since_stream_id, now_token.to_device_key
)

for message in messages:
# We pop here as we shouldn't be sending the message ID down
# `/sync`
message_id = message.pop("message_id", None)
if message_id:
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)

logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
len(messages),
Expand Down
6 changes: 5 additions & 1 deletion synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.replication.tcp.streams import TypingStream
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
Expand Down Expand Up @@ -86,6 +89,7 @@ def _reset(self) -> None:
self._member_last_federation_poke = {}
self.wheel_timer = WheelTimer(bucket_size=5000)

@wrap_as_background_process("typing._handle_timeouts")
def _handle_timeouts(self) -> None:
logger.debug("Checking for typing timeouts")

Expand Down
8 changes: 8 additions & 0 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ def report_span(self, span):
logger = logging.getLogger(__name__)


class SynapseTags:
# The message ID of any to_device message processed
TO_DEVICE_MESSAGE_ID = "to_device.message_id"

# Whether the sync response has new data to be returned to the client.
SYNC_RESULT = "sync.new_data"
Comment on lines +266 to +267
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the purpose of this to check whether a to_device message came down /sync for a user? If so, then won't events other than to_device messages lower the signal-to-noise ratio?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ish, I was more interested in signalling whether we'd found anything or not. I might go in and add per-section flags later, but I'm a bit wary of adding toooo much at once



# Block everything by default
# A regex which matches the server_names to expose traces for.
# None means 'block everything'.
Expand Down
45 changes: 43 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import log_kv, start_active_span
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
Expand Down Expand Up @@ -136,6 +137,15 @@ def notify(
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred

log_kv(
{
"notify": self.user_id,
"stream": stream_key,
"stream_id": stream_id,
"listeners": self.count_listeners(),
}
)

users_woken_by_stream_counter.labels(stream_key).inc()

with PreserveLoggingContext():
Expand Down Expand Up @@ -404,6 +414,13 @@ def on_new_event(
with Measure(self.clock, "on_new_event"):
user_streams = set()

log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
}
)

for user in users:
user_stream = self.user_to_user_stream.get(str(user))
if user_stream is not None:
Expand Down Expand Up @@ -476,21 +493,45 @@ async def wait_for_events(
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
with PreserveLoggingContext():
await listener.deferred

with start_active_span("wait_for_events.deferred"):
log_kv(
{
"wait_for_events": "sleep",
"token": prev_token,
}
)

with PreserveLoggingContext():
await listener.deferred

log_kv(
{
"wait_for_events": "woken",
"token": user_stream.current_token,
}
)

current_token = user_stream.current_token

result = await callback(prev_token, current_token)
log_kv(
{
"wait_for_events": "result",
"result": bool(result),
}
)
if result:
break

# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
except defer.TimeoutError:
log_kv({"wait_for_events": "timeout"})
break
except defer.CancelledError:
log_kv({"wait_for_events": "cancelled"})
break

if result is None:
Expand Down