-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Session Token Management APIs #36971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 67 commits
Commits
Show all changes
75 commits
Select commit
Hold shift + click to select a range
2950e20
merge from main and resolve conflicts
7a1a1eb
remove async keyword from changeFeed query in aio package
b6c53fb
refactor
5f16b14
refactor
36990ef
fix pylint
3c569e8
added public surface methods
tvaron3 7479b0c
pylint fix
2e76620
fix
56bbb9e
added functionality for merging session tokens from logical pk
tvaron3 8c0aa46
fix mypy
28394b9
added tests for basic merge and split
tvaron3 25c3363
resolve comments
cecdfa5
resolve comments
65ed132
resolve comments
4bb30d2
resolve comments
5addcdc
fix pylint
59814d7
fix mypy
ec79b94
merge feed range changes
tvaron3 66c3f7b
fix tests
1e7a268
merged with feed range branch
tvaron3 997b6b0
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 7eda72f
Merge branch 'main' into addFeedRangeSupportInChangeFeed
3a2e4e1
add tests
0883dac
fix pylint
b7d1210
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 195c47c
fix and resolve comments
246b1be
fix and resolve comments
10fe387
Added isSubsetFeedRange logic
tvaron3 6498311
Added request context to crud operations, session token helpers
tvaron3 5a13ddf
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 f5d0d7b
Merge branch 'main' into addFeedRangeSupportInChangeFeed
5cde59b
revert unnecessary change
a494346
Added more tests
tvaron3 0d75607
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 c8c099f
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 ad3ae4f
Added more tests
tvaron3 8f466a1
merge with main
tvaron3 5249d0a
Changed tests to use new public feed range and more test coverage for…
tvaron3 40523f5
Added more tests
tvaron3 9f88b4e
Fix tests and add changelog
tvaron3 7c23e87
fix spell checks
tvaron3 4d0b058
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 d7c598e
Added tests and pushed request context to client level
tvaron3 8698098
Added async methods and removed feed range from request context
tvaron3 c252d88
fix tests
tvaron3 51e721b
fix tests and pylint
tvaron3 923055b
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 104e341
Reacting to comments
tvaron3 5552912
Reacting to comments
tvaron3 1bbbd0f
pylint and added hpk tests
tvaron3 a9299ab
reacting to comments
tvaron3 2155016
fix tests and mypy
tvaron3 0436355
fix mypy
tvaron3 103eb41
fix mypy
tvaron3 76451df
reacting to comments
tvaron3 7b0f4b7
reacting to comments
tvaron3 5d7b978
reacting to comments
tvaron3 d54992f
fix cspell
tvaron3 fa16830
rename method to get_latest_session_token
tvaron3 b2ac9d8
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 6914a20
reacting to reverted feed range
tvaron3 ab9723a
change based on the api review
8a4305d
Reacting to API review and adding samples.
tvaron3 3a1f160
Reacting to API review and adding samples.
tvaron3 4bc16b1
Merge branch 'main' into tvaron3/sessionTokenHelper
tvaron3 900d001
Fixed pylint
tvaron3 96a165f
Merge branch 'tvaron3/sessionTokenHelper' of https://github.com/tvaro…
tvaron3 eab1822
Reacting to comments
tvaron3 97ffec7
Reacting to comments
tvaron3 2264465
Reacting to comments
tvaron3 35588fa
Reacting to comments
tvaron3 c42966f
Fix pydoc
tvaron3 786e357
Fix pydoc
tvaron3 0de21b4
reacting to comments
tvaron3 d32a6f1
reacting to comments
tvaron3 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
224 changes: 224 additions & 0 deletions
224
sdk/cosmos/azure-cosmos/azure/cosmos/_session_token_helpers.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,224 @@ | ||
| # The MIT License (MIT) | ||
| # Copyright (c) 2014 Microsoft Corporation | ||
|
|
||
| # Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| # of this software and associated documentation files (the "Software"), to deal | ||
| # in the Software without restriction, including without limitation the rights | ||
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| # copies of the Software, and to permit persons to whom the Software is | ||
| # furnished to do so, subject to the following conditions: | ||
|
|
||
| # The above copyright notice and this permission notice shall be included in all | ||
| # copies or substantial portions of the Software. | ||
|
|
||
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| # SOFTWARE. | ||
|
|
||
| """Internal Helper functions for manipulating session tokens. | ||
| """ | ||
| from typing import Tuple, List, Dict, Any | ||
|
|
||
| from azure.cosmos._routing.routing_range import Range | ||
| from azure.cosmos._vector_session_token import VectorSessionToken | ||
| from ._change_feed.feed_range_internal import FeedRangeInternalEpk | ||
|
|
||
| # pylint: disable=protected-access | ||
|
|
||
|
|
||
| # ex inputs: | ||
| # 1. "1:1#51", "1:1#55" -> "1:1#55" | ||
tvaron3 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # 2. "0:1#57", "1:1#52" -> "0:1#57" | ||
| def merge_session_tokens_with_same_range(session_token1: str, session_token2: str) -> str: | ||
| pk_range_id1, vector_session_token1 = parse_session_token(session_token1) | ||
| pk_range_id2, vector_session_token2 = parse_session_token(session_token2) | ||
| pk_range_id = pk_range_id1 | ||
| # The partition key range id could be different in this scenario | ||
| # Ex. get_updated_session_token([("AA", "BB"), "1:1#51"], ("AA", "DD")) -> "1:1#51" | ||
tvaron3 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # Then we input this back into get_updated_session_token after a merge happened | ||
| # get_updated_session_token([("AA", "DD"), "1:1#51", ("AA", "DD"), "0:1#55"], ("AA", "DD")) -> "0:1#55" | ||
| if pk_range_id1 != pk_range_id2: | ||
tvaron3 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pk_range_id = pk_range_id1 \ | ||
| if vector_session_token1.global_lsn > vector_session_token2.global_lsn else pk_range_id2 | ||
| vector_session_token = vector_session_token1.merge(vector_session_token2) | ||
| return pk_range_id + ":" + vector_session_token.session_token | ||
|
|
||
| def is_compound_session_token(session_token: str) -> bool: | ||
| return "," in session_token | ||
|
|
||
| def parse_session_token(session_token: str) -> Tuple[str, VectorSessionToken]: | ||
| tokens = session_token.split(":") | ||
| return tokens[0], VectorSessionToken.create(tokens[1]) | ||
|
|
||
| def split_compound_session_tokens(compound_session_tokens: List[Tuple[Range, str]]) -> List[str]: | ||
| session_tokens = [] | ||
| for _, session_token in compound_session_tokens: | ||
| if is_compound_session_token(session_token): | ||
| tokens = session_token.split(",") | ||
| for token in tokens: | ||
| session_tokens.append(token) | ||
| else: | ||
| session_tokens.append(session_token) | ||
| return session_tokens | ||
|
|
||
| # ex inputs: | ||
| # ["1:1#51", "1:1#55", "1:1#57"] -> ["1:1#57"] | ||
| def merge_session_tokens_for_same_partition(session_tokens: List[str]) -> List[str]: | ||
| i = 0 | ||
| while i < len(session_tokens): | ||
| j = i + 1 | ||
| while j < len(session_tokens): | ||
| pk_range_id1, vector_session_token1 = parse_session_token(session_tokens[i]) | ||
| pk_range_id2, vector_session_token2 = parse_session_token(session_tokens[j]) | ||
| if pk_range_id1 == pk_range_id2: | ||
| vector_session_token = vector_session_token1.merge(vector_session_token2) | ||
| session_tokens.append(pk_range_id1 + ":" + vector_session_token.session_token) | ||
| remove_session_tokens = [session_tokens[i], session_tokens[j]] | ||
| for token in remove_session_tokens: | ||
| session_tokens.remove(token) | ||
| i = -1 | ||
tvaron3 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| break | ||
| j += 1 | ||
| i += 1 | ||
|
|
||
| return session_tokens | ||
|
|
||
| # ex inputs: | ||
| # 1. [(("AA", "BB"), "1:1#51"), (("BB", "DD"), "2:1#51"), (("AA", "DD"), "0:1#55")] -> | ||
tvaron3 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # [("AA", "DD"), "0:1#55"] | ||
| # 2. [(("AA", "BB"), "1:1#57"), (("BB", "DD"), "2:1#58"), (("AA", "DD"), "0:1#55")] -> | ||
| # [("AA", "DD"), "1:1#57,2:1#58"] | ||
| # 3. [(("AA", "BB"), "1:1#57"), (("BB", "DD"), "2:1#52"), (("AA", "DD"), "0:1#55")] -> | ||
tvaron3 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # [("AA", "DD"), "1:1#57,2:1#52,0:1#55"] | ||
| # goal here is to detect any obvious merges or splits that happened | ||
| # compound session tokens are not considered will just pass them along | ||
| def merge_ranges_with_subsets(overlapping_ranges: List[Tuple[Range, str]]) -> List[Tuple[Range, str]]: | ||
| processed_ranges = [] | ||
| while len(overlapping_ranges) != 0: # pylint: disable=too-many-nested-blocks | ||
| feed_range_cmp, session_token_cmp = overlapping_ranges[0] | ||
| # compound session tokens are not considered for merging | ||
| if is_compound_session_token(session_token_cmp): | ||
| processed_ranges.append(overlapping_ranges[0]) | ||
| overlapping_ranges.remove(overlapping_ranges[0]) | ||
| continue | ||
| _, vector_session_token_cmp = parse_session_token(session_token_cmp) | ||
| subsets = [] | ||
| # finding the subset feed ranges of the current feed range | ||
| for j in range(1, len(overlapping_ranges)): | ||
| feed_range = overlapping_ranges[j][0] | ||
| if not is_compound_session_token(overlapping_ranges[j][1]) and \ | ||
| feed_range.is_subset(feed_range_cmp): | ||
| subsets.append(overlapping_ranges[j] + (j,)) | ||
|
|
||
| # go through subsets to see if can create current feed range from the subsets | ||
| not_found = True | ||
| j = 0 | ||
| while not_found and j < len(subsets): | ||
| merged_range = subsets[j][0] | ||
| session_tokens = [subsets[j][1]] | ||
| merged_indices = [subsets[j][2]] | ||
| if len(subsets) == 1: | ||
| _, vector_session_token = parse_session_token(session_tokens[0]) | ||
| if vector_session_token_cmp.global_lsn > vector_session_token.global_lsn: | ||
| overlapping_ranges.remove(overlapping_ranges[merged_indices[0]]) | ||
| else: | ||
| for k, subset in enumerate(subsets): | ||
| if j == k: | ||
| continue | ||
| if merged_range.can_merge(subset[0]): | ||
| merged_range = merged_range.merge(subset[0]) | ||
| session_tokens.append(subset[1]) | ||
| merged_indices.append(subset[2]) | ||
| if feed_range_cmp == merged_range: | ||
| # if feed range can be created from the subsets | ||
| # take the subsets if their global lsn is larger | ||
| # else take the current feed range | ||
| children_more_updated = True | ||
| parent_more_updated = True | ||
| for session_token in session_tokens: | ||
| _, vector_session_token = parse_session_token(session_token) | ||
| if vector_session_token_cmp.global_lsn > vector_session_token.global_lsn: | ||
| children_more_updated = False | ||
| else: | ||
| parent_more_updated = False | ||
| feed_ranges_to_remove = [overlapping_ranges[i] for i in merged_indices] | ||
| for feed_range_to_remove in feed_ranges_to_remove: | ||
| overlapping_ranges.remove(feed_range_to_remove) | ||
| if children_more_updated: | ||
| overlapping_ranges.append((merged_range, ','.join(map(str, session_tokens)))) | ||
| overlapping_ranges.remove(overlapping_ranges[0]) | ||
| elif not parent_more_updated and not children_more_updated: | ||
| session_tokens.append(session_token_cmp) | ||
| overlapping_ranges.append((merged_range, ','.join(map(str, session_tokens)))) | ||
| not_found = False | ||
| break | ||
|
|
||
| j += 1 | ||
|
|
||
| processed_ranges.append(overlapping_ranges[0]) | ||
| overlapping_ranges.remove(overlapping_ranges[0]) | ||
| return processed_ranges | ||
|
|
||
| def get_latest_session_token(feed_ranges_to_session_tokens: List[Tuple[Dict[str, Any], str]], | ||
| target_feed_range: Dict[str, Any]): | ||
|
|
||
| target_feed_range_epk = FeedRangeInternalEpk.from_json(target_feed_range) | ||
| target_feed_range_normalized = target_feed_range_epk.get_normalized_range() | ||
| # filter out tuples that overlap with target_feed_range and normalizes all the ranges | ||
| overlapping_ranges = [] | ||
| for feed_range_to_session_token in feed_ranges_to_session_tokens: | ||
| feed_range_epk = FeedRangeInternalEpk.from_json(feed_range_to_session_token[0]) | ||
| if Range.overlaps(target_feed_range_normalized, | ||
| feed_range_epk.get_normalized_range()): | ||
| overlapping_ranges.append((feed_range_epk.get_normalized_range(), | ||
| feed_range_to_session_token[1])) | ||
|
|
||
| if len(overlapping_ranges) == 0: | ||
| raise ValueError('There were no overlapping feed ranges with the target.') | ||
|
|
||
| # merge any session tokens that are the same exact feed range | ||
| i = 0 | ||
| j = 1 | ||
| while i < len(overlapping_ranges) and j < len(overlapping_ranges): | ||
| cur_feed_range = overlapping_ranges[i][0] | ||
| session_token = overlapping_ranges[i][1] | ||
| session_token_1 = overlapping_ranges[j][1] | ||
| if (not is_compound_session_token(session_token) and | ||
| not is_compound_session_token(overlapping_ranges[j][1]) and | ||
tvaron3 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| cur_feed_range == overlapping_ranges[j][0]): | ||
| session_token = merge_session_tokens_with_same_range(session_token, session_token_1) | ||
| feed_ranges_to_remove = [overlapping_ranges[i], overlapping_ranges[j]] | ||
| for feed_range_to_remove in feed_ranges_to_remove: | ||
| overlapping_ranges.remove(feed_range_to_remove) | ||
| overlapping_ranges.append((cur_feed_range, session_token)) | ||
| i, j = 0, 1 | ||
| else: | ||
| j += 1 | ||
| if j == len(overlapping_ranges): | ||
| i += 1 | ||
| j = i + 1 | ||
|
|
||
| # checking for merging of feed ranges that can be created from other feed ranges | ||
| processed_ranges = merge_ranges_with_subsets(overlapping_ranges) | ||
|
|
||
| # break up session tokens that are compound | ||
| remaining_session_tokens = split_compound_session_tokens(processed_ranges) | ||
|
|
||
| if len(remaining_session_tokens) == 1: | ||
| return remaining_session_tokens[0] | ||
| # merging any session tokens with same physical partition key range id | ||
| remaining_session_tokens = merge_session_tokens_for_same_partition(remaining_session_tokens) | ||
|
|
||
| updated_session_token = "" | ||
| # compound the remaining session tokens | ||
| for i, remaining_session_token in enumerate(remaining_session_tokens): | ||
| if i == len(remaining_session_tokens) - 1: | ||
| updated_session_token += remaining_session_token | ||
| else: | ||
| updated_session_token += remaining_session_token + "," | ||
|
|
||
| return updated_session_token | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.