Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Opentrace e2e keys (#5855)
Browse files Browse the repository at this point in the history
Add opentracing tags and logs for e2e keys
  • Loading branch information
JorikSchellekens authored Aug 22, 2019
1 parent c9f11d0 commit 9a6f2be
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 7 deletions.
1 change: 1 addition & 0 deletions changelog.d/5855.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Opentracing for room and e2e keys.
3 changes: 3 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import log_kv, trace
from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
Expand Down Expand Up @@ -507,6 +508,7 @@ def on_query_client_keys(self, origin, content):
def on_query_user_devices(self, origin, user_id):
return self.on_query_request("user_devices", user_id)

@trace
@defer.inlineCallbacks
@log_function
def on_claim_client_keys(self, origin, content):
Expand All @@ -515,6 +517,7 @@ def on_claim_client_keys(self, origin, content):
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))

log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = yield self.store.claim_e2e_one_time_keys(query)

json_result = {}
Expand Down
52 changes: 51 additions & 1 deletion synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from synapse.api.errors import CodeMessageException, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.types import UserID, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.retryutils import NotRetryingDestination
Expand All @@ -46,6 +47,7 @@ def __init__(self, hs):
"client_keys", self.on_federation_query_client_keys
)

@trace
@defer.inlineCallbacks
def query_devices(self, query_body, timeout):
""" Handle a device key query from a client
Expand Down Expand Up @@ -81,6 +83,9 @@ def query_devices(self, query_body, timeout):
else:
remote_queries[user_id] = device_ids

set_tag("local_key_query", local_query)
set_tag("remote_key_query", remote_queries)

# First get local devices.
failures = {}
results = {}
Expand Down Expand Up @@ -121,6 +126,7 @@ def query_devices(self, query_body, timeout):
r[user_id] = remote_queries[user_id]

# Now fetch any devices that we don't have in our cache
@trace
@defer.inlineCallbacks
def do_remote_query(destination):
"""This is called when we are querying the device list of a user on
Expand Down Expand Up @@ -185,6 +191,8 @@ def do_remote_query(destination):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", failure)

yield make_deferred_yieldable(
defer.gatherResults(
Expand All @@ -198,6 +206,7 @@ def do_remote_query(destination):

return {"device_keys": results, "failures": failures}

@trace
@defer.inlineCallbacks
def query_local_devices(self, query):
"""Get E2E device keys for local users
Expand All @@ -210,13 +219,22 @@ def query_local_devices(self, query):
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
map from user_id -> device_id -> device details
"""
set_tag("local_query", query)
local_query = []

result_dict = {}
for user_id, device_ids in query.items():
# we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id)
log_kv(
{
"message": "Requested a local key for a user which"
" was not local to the homeserver",
"user_id": user_id,
}
)
set_tag("error", True)
raise SynapseError(400, "Not a user here")

if not device_ids:
Expand All @@ -241,6 +259,7 @@ def query_local_devices(self, query):
r["unsigned"]["device_display_name"] = display_name
result_dict[user_id][device_id] = r

log_kv(results)
return result_dict

@defer.inlineCallbacks
Expand All @@ -251,6 +270,7 @@ def on_federation_query_client_keys(self, query_body):
res = yield self.query_local_devices(device_keys_query)
return {"device_keys": res}

@trace
@defer.inlineCallbacks
def claim_one_time_keys(self, query, timeout):
local_query = []
Expand All @@ -265,6 +285,9 @@ def claim_one_time_keys(self, query, timeout):
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = device_keys

set_tag("local_key_query", local_query)
set_tag("remote_key_query", remote_queries)

results = yield self.store.claim_e2e_one_time_keys(local_query)

json_result = {}
Expand All @@ -276,8 +299,10 @@ def claim_one_time_keys(self, query, timeout):
key_id: json.loads(json_bytes)
}

@trace
@defer.inlineCallbacks
def claim_client_keys(destination):
set_tag("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = yield self.federation.claim_client_keys(
Expand All @@ -290,6 +315,8 @@ def claim_client_keys(destination):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", failure)

yield make_deferred_yieldable(
defer.gatherResults(
Expand All @@ -313,9 +340,11 @@ def claim_client_keys(destination):
),
)

log_kv({"one_time_keys": json_result, "failures": failures})
return {"one_time_keys": json_result, "failures": failures}

@defer.inlineCallbacks
@tag_args
def upload_keys_for_user(self, user_id, device_id, keys):

time_now = self.clock.time_msec()
Expand All @@ -329,19 +358,38 @@ def upload_keys_for_user(self, user_id, device_id, keys):
user_id,
time_now,
)
log_kv(
{
"message": "Updating device_keys for user.",
"user_id": user_id,
"device_id": device_id,
}
)
# TODO: Sign the JSON with the server key
changed = yield self.store.set_e2e_device_keys(
user_id, device_id, time_now, device_keys
)
if changed:
# Only notify about device updates *if* the keys actually changed
yield self.device_handler.notify_device_update(user_id, [device_id])

else:
log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
one_time_keys = keys.get("one_time_keys", None)
if one_time_keys:
log_kv(
{
"message": "Updating one_time_keys for device.",
"user_id": user_id,
"device_id": device_id,
}
)
yield self._upload_one_time_keys_for_user(
user_id, device_id, time_now, one_time_keys
)
else:
log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)

# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
Expand All @@ -352,6 +400,7 @@ def upload_keys_for_user(self, user_id, device_id, keys):

result = yield self.store.count_e2e_one_time_keys(user_id, device_id)

set_tag("one_time_key_counts", result)
return {"one_time_key_counts": result}

@defer.inlineCallbacks
Expand Down Expand Up @@ -395,6 +444,7 @@ def _upload_one_time_keys_for_user(
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
)

log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)


Expand Down
28 changes: 26 additions & 2 deletions synapse/handlers/e2e_room_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
StoreError,
SynapseError,
)
from synapse.logging.opentracing import log_kv, trace
from synapse.util.async_helpers import Linearizer

logger = logging.getLogger(__name__)
Expand All @@ -49,6 +50,7 @@ def __init__(self, hs):
# changed.
self._upload_linearizer = Linearizer("upload_room_keys_lock")

@trace
@defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
Expand Down Expand Up @@ -84,8 +86,10 @@ def get_room_keys(self, user_id, version, room_id=None, session_id=None):
user_id, version, room_id, session_id
)

log_kv(results)
return results

@trace
@defer.inlineCallbacks
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
Expand All @@ -107,6 +111,7 @@ def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
with (yield self._upload_linearizer.queue(user_id)):
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)

@trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
"""Bulk upload a list of room keys into a given backup version, asserting
Expand Down Expand Up @@ -186,7 +191,14 @@ def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
session_id(str): the session whose room_key we're setting
room_key(dict): the room_key being set
"""

log_kv(
{
"message": "Trying to upload room key",
"room_id": room_id,
"session_id": session_id,
"user_id": user_id,
}
)
# get the room_key for this particular row
current_room_key = None
try:
Expand All @@ -195,14 +207,23 @@ def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
)
except StoreError as e:
if e.code == 404:
pass
log_kv(
{
"message": "Room key not found.",
"room_id": room_id,
"user_id": user_id,
}
)
else:
raise

if self._should_replace_room_key(current_room_key, room_key):
log_kv({"message": "Replacing room key."})
yield self.store.set_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
else:
log_kv({"message": "Not replacing room_key."})

@staticmethod
def _should_replace_room_key(current_room_key, room_key):
Expand Down Expand Up @@ -236,6 +257,7 @@ def _should_replace_room_key(current_room_key, room_key):
return False
return True

@trace
@defer.inlineCallbacks
def create_version(self, user_id, version_info):
"""Create a new backup version. This automatically becomes the new
Expand Down Expand Up @@ -294,6 +316,7 @@ def get_version_info(self, user_id, version=None):
raise
return res

@trace
@defer.inlineCallbacks
def delete_version(self, user_id, version=None):
"""Deletes a given version of the user's e2e_room_keys backup
Expand All @@ -314,6 +337,7 @@ def delete_version(self, user_id, version=None):
else:
raise

@trace
@defer.inlineCallbacks
def update_version(self, user_id, version, version_info):
"""Update the info about a given version of the user's backup
Expand Down
13 changes: 12 additions & 1 deletion synapse/rest/client/v2_alpha/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
parse_json_object_from_request,
parse_string,
)
from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
from synapse.types import StreamToken

from ._base import client_patterns
Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(self, hs):
self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler()

@trace_using_operation_name("upload_keys")
@defer.inlineCallbacks
def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
Expand All @@ -78,6 +80,14 @@ def on_POST(self, request, device_id):
# passing the device_id here is deprecated; however, we allow it
# for now for compatibility with older clients.
if requester.device_id is not None and device_id != requester.device_id:
set_tag("error", True)
log_kv(
{
"message": "Client uploading keys for a different device",
"logged_in_id": requester.device_id,
"key_being_uploaded": device_id,
}
)
logger.warning(
"Client uploading keys for a different device "
"(logged in as %s, uploading for %s)",
Expand Down Expand Up @@ -178,10 +188,11 @@ def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)

from_token_string = parse_string(request, "from")
set_tag("from", from_token_string)

# We want to enforce they do pass us one, but we ignore it and return
# changes after the "to" as well as before.
parse_string(request, "to")
set_tag("to", parse_string(request, "to"))

from_token = StreamToken.from_string(from_token_string)

Expand Down
Loading

0 comments on commit 9a6f2be

Please sign in to comment.