Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add DeferredCache.get_immediate method #8568

Merged
merged 4 commits into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8568.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `get_immediate` method to `DeferredCache`.
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ async def add_pusher(
lock=False,
)

user_has_pusher = self.get_if_user_has_pusher.cache.get(
user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(
(user_id,), None, update_metrics=False
)

Expand Down
11 changes: 1 addition & 10 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache

Expand Down Expand Up @@ -413,18 +412,10 @@ def _invalidate_get_users_with_receipts_in_room(
if receipt_type != "m.read":
return

# Returns either an ObservableDeferred or the raw result
res = self.get_users_with_read_receipts_in_room.cache.get(
res = self.get_users_with_read_receipts_in_room.cache.get_immediate(
room_id, None, update_metrics=False
)

# first handle the ObservableDeferred case
if isinstance(res, ObservableDeferred):
if res.has_called():
res = res.get_result()
else:
res = None

if res and user_id in res:
# We'd only be adding to the set, so no point invalidating if the
# user is already there
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ async def _get_joined_users_from_context(
# If we do then we can reuse that result and simply update it with
# any membership changes in `delta_ids`
if context.prev_group and context.delta_ids:
prev_res = self._get_joined_users_from_context.cache.get(
prev_res = self._get_joined_users_from_context.cache.get_immediate(
(room_id, context.prev_group), None
)
if prev_res and isinstance(prev_res, dict):
Expand Down
35 changes: 25 additions & 10 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@

import enum
import threading
from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, cast
from typing import (
Callable,
Generic,
Iterable,
MutableMapping,
Optional,
TypeVar,
Union,
cast,
)

from prometheus_client import Gauge

Expand All @@ -33,7 +42,7 @@
["name"],
)


T = TypeVar("T")
KT = TypeVar("KT")
VT = TypeVar("VT")

Expand Down Expand Up @@ -119,21 +128,21 @@ def check_thread(self):
def get(
self,
key: KT,
default=_Sentinel.sentinel,
callback: Optional[Callable[[], None]] = None,
update_metrics: bool = True,
):
) -> Union[ObservableDeferred, VT]:
"""Looks the key up in the caches.

Args:
key(tuple)
default: What is returned if key is not in the caches. If not
specified then function throws KeyError instead
callback(fn): Gets called when the entry in the cache is invalidated
update_metrics (bool): whether to update the cache hit rate metrics

Returns:
Either an ObservableDeferred or the result itself

Raises:
KeyError if the key is not found in the cache
"""
callbacks = [callback] if callback else []
val = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
Expand All @@ -145,13 +154,19 @@ def get(
m.inc_hits()
return val.deferred

val = self.cache.get(
key, default, callbacks=callbacks, update_metrics=update_metrics
val2 = self.cache.get(
key, _Sentinel.sentinel, callbacks=callbacks, update_metrics=update_metrics
)
if val is _Sentinel.sentinel:
if val2 is _Sentinel.sentinel:
raise KeyError()
else:
return val
return val2

def get_immediate(
self, key: KT, default: T, update_metrics: bool = True
) -> Union[VT, T]:
"""If we have a *completed* cached value, return it."""
return self.cache.get(key, default, update_metrics=update_metrics)

def set(
self,
Expand Down
27 changes: 23 additions & 4 deletions tests/util/caches/test_deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ def test_hit(self):

self.assertEquals(cache.get("foo"), 123)

def test_get_immediate(self):
cache = DeferredCache("test")
d1 = defer.Deferred()
cache.set("key1", d1)

# get_immediate should return default
v = cache.get_immediate("key1", 1)
self.assertEqual(v, 1)

# now complete the set
d1.callback(2)

# get_immediate should return result
v = cache.get_immediate("key1", 1)
self.assertEqual(v, 2)

def test_invalidate(self):
cache = DeferredCache("test")
cache.prefill(("foo",), 123)
Expand Down Expand Up @@ -80,17 +96,20 @@ def record_callback(idx):
# now do the invalidation
cache.invalidate_all()

# lookup should return none
self.assertIsNone(cache.get("key1", None))
self.assertIsNone(cache.get("key2", None))
# lookup should fail
with self.assertRaises(KeyError):
cache.get("key1")
with self.assertRaises(KeyError):
cache.get("key2")

# both callbacks should have been callbacked
self.assertTrue(callback_record[0], "Invalidation callback for key1 not called")
self.assertTrue(callback_record[1], "Invalidation callback for key2 not called")

# letting the other lookup complete should do nothing
d1.callback("result1")
self.assertIsNone(cache.get("key1", None))
with self.assertRaises(KeyError):
cache.get("key1", None)

def test_eviction(self):
cache = DeferredCache(
Expand Down