This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Consistently compare to the earliest known stream position in the stream change cache #14435
Merged
Merged
Changes from 14 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
64dc954
Add (failing) tests.
clokep c969ab4
Use strict comparisons in stream change cache.
clokep 3a46941
Newsfragment
clokep f0dbf31
Update asserts.
clokep 7d0e07f
Clarify comments.
clokep 9842054
Use peek instead of bisect.
clokep 0c6c603
Clarify code & comments.
clokep e2e4f37
Improve docstrings.
clokep 532f69a
Update tests for changes.
clokep dff4b5c
Merge branch 'develop' into clokep/stream-cache-fix
clokep 6b09889
Bump min sortedcontainers version.
clokep f0634bf
Merge remote-tracking branch 'origin/develop' into clokep/stream-cach…
clokep d3513a3
Fix poetry lock.
clokep ae77f8b
Newsfragment
clokep 53cd4cc
Add a comment.
clokep 865792a
Merge remote-tracking branch 'origin/develop' into clokep/stream-cach…
clokep File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances. |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,13 +27,17 @@ | |
|
||
|
||
class StreamChangeCache: | ||
"""Keeps track of the stream positions of the latest change in a set of entities. | ||
""" | ||
Keeps track of the stream positions of the latest change in a set of entities. | ||
|
||
The entity will is typically a room ID or user ID, but can be any string. | ||
|
||
Typically the entity will be a room or user id. | ||
Can be queried for whether a specific entity has changed after a stream position | ||
or for a list of changed entities after a stream position. See the individual | ||
methods for more information. | ||
|
||
Given a list of entities and a stream position, it will give a subset of | ||
entities that may have changed since that position. If position key is too | ||
old then the cache will simply return all given entities. | ||
Only tracks to a maximum cache size, any position earlier than the earliest | ||
known stream position must be treated as unknown. | ||
""" | ||
|
||
def __init__( | ||
|
@@ -45,16 +49,20 @@ def __init__( | |
) -> None: | ||
self._original_max_size: int = max_size | ||
self._max_size = math.floor(max_size) | ||
self._entity_to_key: Dict[EntityType, int] = {} | ||
|
||
# map from stream id to the a set of entities which changed at that stream id. | ||
# map from stream id to the set of entities which changed at that stream id. | ||
self._cache: SortedDict[int, Set[EntityType]] = SortedDict() | ||
# map from entity to the stream ID of the latest change for that entity. | ||
# | ||
# Must be kept in sync with _cache. | ||
self._entity_to_key: Dict[EntityType, int] = {} | ||
|
||
# the earliest stream_pos for which we can reliably answer | ||
# get_all_entities_changed. In other words, one less than the earliest | ||
# stream_pos for which we know _cache is valid. | ||
# | ||
self._earliest_known_stream_pos = current_stream_pos | ||
|
||
self.name = name | ||
self.metrics = caches.register_cache( | ||
"cache", self.name, self._cache, resize_callback=self.set_cache_factor | ||
|
@@ -82,38 +90,74 @@ def set_cache_factor(self, factor: float) -> bool: | |
return False | ||
|
||
def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool: | ||
"""Returns True if the entity may have been updated since stream_pos""" | ||
""" | ||
Returns True if the entity may have been updated after stream_pos. | ||
|
||
Args: | ||
entity: The entity to check for changes. | ||
stream_pos: The stream position to check for changes after. | ||
|
||
Return: | ||
True if the entity may have been updated, this happens if: | ||
* The given stream position is at or earlier than the earliest | ||
known stream position. | ||
* The given stream position is earlier than the latest change for | ||
the entity. | ||
|
||
False otherwise: | ||
* The entity is unknown. | ||
* The given stream position is at or later than the latest change | ||
for the entity. | ||
""" | ||
assert isinstance(stream_pos, int) | ||
|
||
if stream_pos < self._earliest_known_stream_pos: | ||
# _cache is not valid at or before the earliest known stream position, so | ||
# return that the entity has changed. | ||
if stream_pos <= self._earliest_known_stream_pos: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
self.metrics.inc_misses() | ||
return True | ||
|
||
# If the entity is unknown, it hasn't changed. | ||
latest_entity_change_pos = self._entity_to_key.get(entity, None) | ||
if latest_entity_change_pos is None: | ||
self.metrics.inc_hits() | ||
return False | ||
|
||
# This is a known entity, return true if the stream position is earlier | ||
# than the last change. | ||
if stream_pos < latest_entity_change_pos: | ||
self.metrics.inc_misses() | ||
return True | ||
|
||
# Otherwise, the stream position is after the latest change: return false. | ||
self.metrics.inc_hits() | ||
return False | ||
|
||
def get_entities_changed( | ||
self, entities: Collection[EntityType], stream_pos: int | ||
) -> Union[Set[EntityType], FrozenSet[EntityType]]: | ||
""" | ||
Returns subset of entities that have had new things since the given | ||
position. Entities unknown to the cache will be returned. If the | ||
position is too old it will just return the given list. | ||
Returns the subset of the given entities that have had changes after the given position. | ||
|
||
Entities unknown to the cache will be returned. | ||
|
||
If the position is too old it will just return the given list. | ||
|
||
Args: | ||
entities: Entities to check for changes. | ||
stream_pos: The stream position to check for changes after. | ||
|
||
Return: | ||
A subset of entities which have changed after the given stream position. | ||
|
||
This will be all entities if the given stream position is at or earlier | ||
than the earliest known stream position. | ||
""" | ||
changed_entities = self.get_all_entities_changed(stream_pos) | ||
if changed_entities is not None: | ||
# We now do an intersection, trying to do so in the most efficient | ||
# way possible (some of these sets are *large*). First check in the | ||
# given iterable is already set that we can reuse, otherwise we | ||
# given iterable is already a set that we can reuse, otherwise we | ||
# create a set of the *smallest* of the two iterables and call | ||
# `intersection(..)` on it (this can be twice as fast as the reverse). | ||
if isinstance(entities, (set, frozenset)): | ||
|
@@ -130,29 +174,57 @@ def get_entities_changed( | |
return result | ||
|
||
def has_any_entity_changed(self, stream_pos: int) -> bool: | ||
"""Returns if any entity has changed""" | ||
assert type(stream_pos) is int | ||
""" | ||
Returns true if any entity has changed after the given stream position. | ||
|
||
Args: | ||
stream_pos: The stream position to check for changes after. | ||
|
||
Return: | ||
True if any entity has changed after the given stream position or | ||
if the given stream position is at or earlier than the earliest | ||
known stream position. | ||
|
||
False otherwise. | ||
""" | ||
assert isinstance(stream_pos, int) | ||
|
||
if not self._cache: | ||
# If the cache is empty, nothing can have changed. | ||
return False | ||
|
||
if stream_pos >= self._earliest_known_stream_pos: | ||
self.metrics.inc_hits() | ||
return self._cache.bisect_right(stream_pos) < len(self._cache) | ||
else: | ||
# _cache is not valid at or before the earliest known stream position, so | ||
# return that an entity has changed. | ||
if stream_pos <= self._earliest_known_stream_pos: | ||
self.metrics.inc_misses() | ||
return True | ||
|
||
self.metrics.inc_hits() | ||
return stream_pos < self._cache.peekitem()[0] | ||
|
||
def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]: | ||
"""Returns all entities that have had new things since the given | ||
position. If the position is too old it will return None. | ||
""" | ||
Returns all entities that have had changes after the given position. | ||
|
||
If the stream change cache does not go far enough back, i.e. the position | ||
is too old, it will return None. | ||
|
||
Returns the entities in the order that they were changed. | ||
|
||
Args: | ||
stream_pos: The stream position to check for changes after. | ||
|
||
Return: | ||
Entities which have changed after the given stream position. | ||
|
||
None if the given stream position is at or earlier than the earliest | ||
known stream position. | ||
""" | ||
assert type(stream_pos) is int | ||
assert isinstance(stream_pos, int) | ||
|
||
if stream_pos < self._earliest_known_stream_pos: | ||
# _cache is not valid at or before the earliest known stream position, so | ||
# return None to mark that it is unknown if an entity has changed. | ||
if stream_pos <= self._earliest_known_stream_pos: | ||
return None | ||
|
||
changed_entities: List[EntityType] = [] | ||
|
@@ -162,11 +234,17 @@ def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType] | |
return changed_entities | ||
|
||
def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: | ||
"""Informs the cache that the entity has been changed at the given | ||
position. | ||
""" | ||
assert type(stream_pos) is int | ||
Informs the cache that the entity has been changed at the given position. | ||
|
||
Args: | ||
entity: The entity to mark as changed. | ||
stream_pos: The stream position to update the entity to. | ||
""" | ||
assert isinstance(stream_pos, int) | ||
|
||
# For a change before _cache is valid (e.g. at or before the earliest known | ||
# stream position) there's nothing to do. | ||
if stream_pos <= self._earliest_known_stream_pos: | ||
return | ||
|
||
|
@@ -189,6 +267,11 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: | |
self._evict() | ||
|
||
def _evict(self) -> None: | ||
""" | ||
Ensure the cache has not exceeded the maximum size. | ||
|
||
Evicts entries until it is at the maximum size. | ||
""" | ||
# if the cache is too big, remove entries | ||
while len(self._cache) > self._max_size: | ||
k, r = self._cache.popitem(0) | ||
|
@@ -199,5 +282,12 @@ def _evict(self) -> None: | |
def get_max_pos_of_last_change(self, entity: EntityType) -> int: | ||
"""Returns an upper bound of the stream id of the last change to an | ||
entity. | ||
|
||
Args: | ||
entity: The entity to check. | ||
|
||
Return: | ||
The stream position of the latest change for the given entity or | ||
the earliest known stream position if the entitiy is unknown. | ||
""" | ||
return self._entity_to_key.get(entity, self._earliest_known_stream_pos) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,8 @@ def test_has_entity_changed(self) -> None: | |
# return True, whether it's a known entity or not. | ||
self.assertTrue(cache.has_entity_changed("[email protected]", 0)) | ||
self.assertTrue(cache.has_entity_changed("[email protected]", 0)) | ||
self.assertTrue(cache.has_entity_changed("[email protected]", 3)) | ||
self.assertTrue(cache.has_entity_changed("[email protected]", 3)) | ||
|
||
def test_entity_has_changed_pops_off_start(self) -> None: | ||
""" | ||
|
@@ -65,26 +67,25 @@ def test_entity_has_changed_pops_off_start(self) -> None: | |
|
||
# The cache is at the max size, 2 | ||
self.assertEqual(len(cache._cache), 2) | ||
# The cache's earliest known position is 2. | ||
self.assertEqual(cache._earliest_known_stream_pos, 2) | ||
|
||
# The oldest item has been popped off | ||
self.assertTrue("[email protected]" not in cache._entity_to_key) | ||
|
||
self.assertEqual( | ||
cache.get_all_entities_changed(2), | ||
["[email protected]", "[email protected]"], | ||
) | ||
self.assertIsNone(cache.get_all_entities_changed(1)) | ||
self.assertEqual(cache.get_all_entities_changed(3), ["[email protected]"]) | ||
self.assertIsNone(cache.get_all_entities_changed(2)) | ||
|
||
# If we update an existing entity, it keeps the two existing entities | ||
cache.entity_has_changed("[email protected]", 5) | ||
self.assertEqual( | ||
{"[email protected]", "[email protected]"}, set(cache._entity_to_key) | ||
) | ||
self.assertEqual( | ||
cache.get_all_entities_changed(2), | ||
cache.get_all_entities_changed(3), | ||
["[email protected]", "[email protected]"], | ||
) | ||
self.assertIsNone(cache.get_all_entities_changed(1)) | ||
self.assertIsNone(cache.get_all_entities_changed(2)) | ||
|
||
def test_get_all_entities_changed(self) -> None: | ||
""" | ||
|
@@ -99,28 +100,15 @@ def test_get_all_entities_changed(self) -> None: | |
cache.entity_has_changed("[email protected]", 3) | ||
cache.entity_has_changed("[email protected]", 4) | ||
|
||
r = cache.get_all_entities_changed(1) | ||
r = cache.get_all_entities_changed(2) | ||
|
||
# either of these are valid | ||
ok1 = [ | ||
"[email protected]", | ||
"[email protected]", | ||
"[email protected]", | ||
"[email protected]", | ||
] | ||
ok2 = [ | ||
"[email protected]", | ||
"[email protected]", | ||
"[email protected]", | ||
"[email protected]", | ||
] | ||
# Results are ordered so either of these are valid. | ||
ok1 = ["[email protected]", "[email protected]", "[email protected]"] | ||
ok2 = ["[email protected]", "[email protected]", "[email protected]"] | ||
self.assertTrue(r == ok1 or r == ok2) | ||
|
||
r = cache.get_all_entities_changed(2) | ||
self.assertTrue(r == ok1[1:] or r == ok2[1:]) | ||
|
||
self.assertEqual(cache.get_all_entities_changed(3), ["[email protected]"]) | ||
self.assertEqual(cache.get_all_entities_changed(0), None) | ||
self.assertEqual(cache.get_all_entities_changed(1), None) | ||
|
||
# ... later, things gest more updates | ||
cache.entity_has_changed("[email protected]", 5) | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To use
peekitem()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also poked packagers and it seemed 2.x was available everywhere.