Skip to content
24 changes: 14 additions & 10 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


import itertools
import json
import logging
from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple

Expand Down Expand Up @@ -281,17 +282,20 @@ def process_replication_rows(
== GET_E2E_CROSS_SIGNING_SIGNATURES_FOR_DEVICE_CACHE_NAME
):
# "keys" is a list of strings, where each string is a
# stringified representation of the tuple keys, i.e.
# keys: ['(@userid:domain,DEVICEID)','(@userid2:domain,DEVICEID2)']
# JSON-encoded representation of the tuple keys, i.e.
# keys: ['["@userid:domain", "DEVICEID"]','["@userid2:domain", "DEVICEID2"]']
#
# This is a side-effect of not being able to send nested information over replication.
for tuple_key in row.keys:
user_id, device_id = (
# Remove the leading and following parantheses.
tuple_key[1:-1]
# Split by comma
.split(",")
)
# This is a side-effect of not being able to send nested
# information over replication.
for json_str in row.keys:
try:
user_id, device_id = json.loads(json_str)
except (json.JSONDecodeError, TypeError):
logger.error(
"Failed to deserialise cache key as valid JSON: %s",
json_str,
)
continue

# Invalidate each key.
#
Expand Down
26 changes: 23 additions & 3 deletions synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#
#
import abc
import json
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -1862,10 +1863,29 @@ def _store_e2e_cross_signing_signatures(
]

if to_invalidate:
self._invalidate_cache_and_stream_bulk(
# Invalidate the local cache of this worker.
for cache_key in to_invalidate:
txn.call_after(
self._get_e2e_cross_signing_signatures_for_device.invalidate,
cache_key,
)

# Stream cache invalidate keys over replication.
#
# We can only send a primitive per function argument across
# replication.
#
# Encode the array of strings as a JSON string, and we'll unpack
# it on the other side.
to_send = [
(json.dumps([user_id, item.target_device_id]),)
for item in signatures
Comment on lines +1873 to +1874
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not possible to json.dumps the entries of to_invalidate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can; it looks like this:

to_send = [
    (json.dumps(item[0]),)
    for item in to_invalidate
]

which seemed less clear as to what we're actually sending than what's there now.

]

self._send_invalidation_to_replication_bulk(
txn,
self._get_e2e_cross_signing_signatures_for_device,
to_invalidate,
cache_name=self._get_e2e_cross_signing_signatures_for_device.__name__,
key_tuples=to_send,
)

await self.db_pool.runInteraction(
Expand Down