Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Instrument _check_sigs_and_hash_and_fetch to trace time spent in ch…
Browse files Browse the repository at this point in the history
…ild concurrent calls (#13588)

Instrument `_check_sigs_and_hash_and_fetch` to trace time spent in child concurrent calls because I've see `_check_sigs_and_hash_and_fetch` take [10.41s to process 100 events](#13587)

Fix #13587

Part of #13356
  • Loading branch information
MadLittleMods authored Aug 24, 2022
1 parent a25a370 commit 7af07f9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 3 deletions.
1 change: 1 addition & 0 deletions changelog.d/13588.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Instrument `_check_sigs_and_hash_and_fetch` to trace time spent in child concurrent calls for understandable traces in Jaeger.
2 changes: 2 additions & 0 deletions synapse/crypto/event_signing.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase
from synapse.events.utils import prune_event, prune_event_dict
from synapse.logging.opentracing import trace
from synapse.types import JsonDict

logger = logging.getLogger(__name__)

Hasher = Callable[[bytes], "hashlib._Hash"]


@trace
def check_event_content_hash(
event: EventBase, hash_algorithm: Hasher = hashlib.sha256
) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions synapse/events/spamcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import synapse
from synapse.api.errors import Codes
from synapse.logging.opentracing import trace
from synapse.rest.media.v1._base import FileInfo
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
from synapse.spam_checker_api import RegistrationBehaviour
Expand Down Expand Up @@ -378,6 +379,7 @@ def register_callbacks(
if check_media_file_for_spam is not None:
self._check_media_file_for_spam_callbacks.append(check_media_file_for_spam)

@trace
async def check_event_for_spam(
self, event: "synapse.events.EventBase"
) -> Union[Tuple[Codes, JsonDict], str]:
Expand Down
22 changes: 22 additions & 0 deletions synapse/federation/federation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event, validate_canonicaljson
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.opentracing import log_kv, trace
from synapse.types import JsonDict, get_domain_from_id

if TYPE_CHECKING:
Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(self, hs: "HomeServer"):
self._clock = hs.get_clock()
self._storage_controllers = hs.get_storage_controllers()

@trace
async def _check_sigs_and_hash(
self, room_version: RoomVersion, pdu: EventBase
) -> EventBase:
Expand Down Expand Up @@ -97,17 +99,36 @@ async def _check_sigs_and_hash(
"Event %s seems to have been redacted; using our redacted copy",
pdu.event_id,
)
log_kv(
{
"message": "Event seems to have been redacted; using our redacted copy",
"event_id": pdu.event_id,
}
)
else:
logger.warning(
"Event %s content has been tampered, redacting",
pdu.event_id,
)
log_kv(
{
"message": "Event content has been tampered, redacting",
"event_id": pdu.event_id,
}
)
return redacted_event

spam_check = await self.spam_checker.check_event_for_spam(pdu)

if spam_check != self.spam_checker.NOT_SPAM:
logger.warning("Event contains spam, soft-failing %s", pdu.event_id)
log_kv(
{
"message": "Event contains spam, redacting (to save disk space) "
"as well as soft-failing (to stop using the event in prev_events)",
"event_id": pdu.event_id,
}
)
# we redact (to save disk space) as well as soft-failing (to stop
# using the event in prev_events).
redacted_event = prune_event(pdu)
Expand All @@ -117,6 +138,7 @@ async def _check_sigs_and_hash(
return pdu


@trace
async def _check_sigs_on_pdu(
keyring: Keyring, room_version: RoomVersion, pdu: EventBase
) -> None:
Expand Down
23 changes: 20 additions & 3 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.types import QueryParams
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
Expand Down Expand Up @@ -587,11 +587,15 @@ async def _check_sigs_and_hash_and_fetch(
Returns:
A list of PDUs that have valid signatures and hashes.
"""
set_tag(
SynapseTags.RESULT_PREFIX + "pdus.length",
str(len(pdus)),
)

# We limit how many PDUs we check at once, as if we try to do hundreds
# of thousands of PDUs at once we see large memory spikes.

valid_pdus = []
valid_pdus: List[EventBase] = []

async def _execute(pdu: EventBase) -> None:
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
Expand All @@ -607,6 +611,8 @@ async def _execute(pdu: EventBase) -> None:

return valid_pdus

@trace
@tag_args
async def _check_sigs_and_hash_and_fetch_one(
self,
pdu: EventBase,
Expand Down Expand Up @@ -639,16 +645,27 @@ async def _check_sigs_and_hash_and_fetch_one(
except InvalidEventSignatureError as e:
logger.warning(
"Signature on retrieved event %s was invalid (%s). "
"Checking local store/orgin server",
"Checking local store/origin server",
pdu.event_id,
e,
)
log_kv(
{
"message": "Signature on retrieved event was invalid. "
"Checking local store/origin server",
"event_id": pdu.event_id,
"InvalidEventSignatureError": e,
}
)

# Check local db.
res = await self.store.get_event(
pdu.event_id, allow_rejected=True, allow_none=True
)

# If the PDU fails its signature check and we don't have it in our
# database, we then request it from sender's server (if that is not the
# same as `origin`).
pdu_origin = get_domain_from_id(pdu.sender)
if not res and pdu_origin != origin:
try:
Expand Down

0 comments on commit 7af07f9

Please sign in to comment.